impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <philippkrause.m...@googlemail.com>
Subject Re: Local join instead of data exchange - co-located blocks
Date Fri, 04 May 2018 00:51:47 GMT
Hello Alex,

I have tried out several configurations but I still couldn't find a 
solution for my problem :( In the query summary (s. attachment) it looks 
like as if no rows are read. Do you have an idea what I have to change? 
I am sorry for the circumstances and thank you once more for the great 
support to get this working!


Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>
> Hi Alex,
> I got the modified version working on my cluster. The query plan looks 
> exactly as wanted (s. attachment). This is awesome! Unfortunately the 
> result set is empty. As you can see in query_state.png, the scan 
> progress always shows 50% although the query has finished.
>
> The only modification in the code is the if statement you pointed to 
> me (I set it to true). Maybe I have to give Impala the information 
> about the lhs / rhs join partition since there are no exchange nodes 
> now (like in the following lines)? The corresponding  partitions / 
> blocks of each table are on the same node.
>
> I think we are very close to the final result and I hope you can help 
> me once more. Thank you so much!
>
> Best regards
> Philipp
>
>
> Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>> On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause 
>> <philippkrause.mail@googlemail.com 
>> <mailto:philippkrause.mail@googlemail.com>> wrote:
>>
>>     To prevent the broadcast join I could simply use the shuffle
>>     operator in the query:
>>
>>     SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>>     business_partition_2 WHERE
>>     business_partition_1.businessid=business_partition_2.businessid
>>
>>
>> Not sure what version of Impala you are using, and whether hints 
>> override any changes you might make. I suggest you make the code work 
>> as you wish without requiring hints.
>>
>>
>>     I think the broadcast is currently only used because of my very
>>     small test tables.
>>
>>     This gives me the plan attached as partitioned_shuffle.png. Since
>>     my modified version isn't working yet, I partitioned both tables
>>     on businessid in Impala. The "hack" should only help to get into
>>     the if-condition if I partition the data manually, right?. But in
>>     this case (if the partitioning is done by Impala itself) Impala
>>     should get into the if-condition anyway. Unfortunately I can't
>>     see a difference in the plan compared to my unpartitioned tables
>>     (unpartitioned.png) concerning the exchange nodes. My goal is
>>     actually to get rid of all exchange nodes since the corresponding
>>     data is already present on each node. Actually the plan should
>>     look the same except for the 03 and 04 exchange then.
>>
>>
>> I understand the picture and goal. I suggest you read, understand, 
>> and modify the code in DistributedPlanner.createHashJoinFragment() to 
>> create the plan shape that you want.
>> I don't know how you are producing these plans. Are you sure your 
>> code changes are taking effect?
>>
>>
>>     Are there other changes neccessary to just take
>>     Table1-partition/block X and Table2-partition/block Y on each
>>     node and join them without any data exchange? Actually each node
>>     should take all its local blocks for both tables, join them and
>>     pass the results back to the coordinator where all results come
>>     together (please see explanation.png).
>>
>>
>> No. You just need to create the plan without exchange nodes, as we've 
>> already gone through early in this thread.
>>
>>
>>     I'm looking forward hearing from you. I hope we can realise this.
>>
>>     Best regards
>>     Philipp
>>
>>
>>     Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>>     On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause
>>>     <philippkrause.mail@googlemail.com
>>>     <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>
>>>         Hi Alex,
>>>         thanks for the information! I've compiled the
>>>         cdh5.13.1-release and replaced
>>>         impala-frontend-0.1-SNAPSHOT.jar (which seems to include the
>>>         changes in the DistributedPlanner.java). There still seems
>>>         to to be a method missing after replacing the jar but I'll
>>>         try to figure that out.
>>>
>>>         I have two questions concerning the code fragment in the
>>>         DistributedPlanner.java you pointed to me.
>>>
>>>         First:
>>>         The attached graphic shows the query plan for two tables
>>>         which are partitioned on the join attribute (standard impala
>>>         version without any changes). If I change the if-condition
>>>         to true for my modified version I expect to get the same
>>>         result for my "hand made" partitions (outside of impala).
>>>         But why is there an exchange broadcast (even in the standard
>>>         version)? I mean, if I have a partition of Table 1 with ID=0
>>>         on Node X why is there a broadcast of the partition of Table
>>>         2 with ID=0 on Node Y? Actually this partition only has to
>>>         be sent to Node X where the matching partition of Table 1 is
>>>         located (instead of sending it to all nodes (broadcast)). Or
>>>         is this exactly the case and it's only shown as a broadcast
>>>         here in the graphics?
>>>
>>>
>>>     You are reading the plan correctly. Impala simply does not
>>>     implement that optimization.
>>>
>>>
>>>         Second:
>>>         All corresponding blocks of the partitioned tables are on
>>>         the same node (e.g. Table 1 Partition with ID=0, Table 2
>>>         Partition with ID=0 => Node 1 etc.). This is what I did
>>>         manually. As already mentioned before I want to join these
>>>         partitions (blocks) locally on each node. But if I'm
>>>         correct, the modification in the DistributedPlanner will
>>>         also only lead to the plan in the attached graphic so that
>>>         no further exchanges are created if I use my "hand made"
>>>         partitions. But there is still the broadcast exchange which
>>>         distributes the partitions across the cluster which isn't
>>>         neccessary because all needed blocks are already on the same
>>>         node and are ready to get joined locally. Is there a way to
>>>         realise that and get rid of the broadcast exchange?
>>>
>>>     You are right. That hack in DistributedPlanner only works for
>>>     partitioned hash joins. You can probably stitch the plan
>>>     together at an earlier place, e.g.:
>>>     https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506
>>>     <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506>
>>>
>>>         Please correct me if I'm wrong with my assumptions.
>>>
>>>         Thank you very much!
>>>
>>>         Best regards
>>>         Philipp
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>         Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>>         Your CM-managed cluster must be running a "compatible"
>>>>         Impala version already for this trick to work. It looks
>>>>         like your catalog binary is trying to find a method which
>>>>         does not exist in the .jar, presumably because your .jar is
>>>>         built based on a different version of Impala where that
>>>>         method does not exist anymore.
>>>>
>>>>         It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is
>>>>         based on Impala 2.10, see:
>>>>         https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513
>>>>         <https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513>
>>>>
>>>>         That means this binary copying trick will only work with a
>>>>         modified version of Impala 2.10, and very likely will not
>>>>         work with a different version.
>>>>
>>>>         It's probably easier to test with the mini cluster first.
>>>>         Alternatively, it "might" work if you replace all the
>>>>         binaries mentioned above, but it's quite possible that will
>>>>         not work.
>>>>
>>>>         On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause
>>>>         <philippkrause.mail@googlemail.com
>>>>         <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>
>>>>             Hi Alex! Thank you for the list! The build of the
>>>>             modified cdh5-trunk branch (debug mode) was sucessfull.
>>>>             After replacing "impala-frontend-0.1-SNAPSHOT.jar" in
>>>>             /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
>>>>             I got the following error in my existing cluster:
>>>>             F0416 01:16:45.402997 17897 catalog.cc:69]
>>>>             NoSuchMethodError: getCatalogObjects
>>>>             When I switch back to the original jar file the error
>>>>             is gone. So it must be something wrong with this file I
>>>>             guess. But I wonder about the error in catalog.cc
>>>>             because I didn't touch any .cc files.
>>>>
>>>>             I also replaced
>>>>             "impala-data-source-api-1.0-SNAPSHOT.jar". The other
>>>>             jar files do not exist in my impala installation
>>>>             (CDH-5.13.1).
>>>>
>>>>             What am I doing wrong?
>>>>
>>>>             Best regards
>>>>             Philipp
>>>>
>>>>
>>>>             Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>>             Here's the foll list. It might not be minimal, but
>>>>>             copying/overwriting these should work.
>>>>>
>>>>>             debug/service/impalad
>>>>>             debug/service/libfesupport.so
>>>>>             debug/service/libService.a
>>>>>             release/service/impalad
>>>>>             release/service/libfesupport.so
>>>>>             release/service/libService.a
>>>>>             yarn-extras-0.1-SNAPSHOT.jar
>>>>>             impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>>>             impala-data-source-api-1.0-SNAPSHOT.jar
>>>>>             impala-frontend-0.1-SNAPSHOT-tests.jar
>>>>>             impala-frontend-0.1-SNAPSHOT.jar
>>>>>             libkudu_client.so.0.1.0
>>>>>             libstdc++.so.6.0.20
>>>>>             impala-no-sse.bc
>>>>>             impala-sse.bc
>>>>>             libimpalalzo.so
>>>>>
>>>>>             If you are only modifying the Java portion (like
>>>>>             DistributedPlanner), then only copying/replacing the
>>>>>             *.jar files should be sufficient.
>>>>>
>>>>>             On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause
>>>>>             <philippkrause.mail@googlemail.com
>>>>>             <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>>
>>>>>                 Yes, I have a running (virtual) cluster. I would
>>>>>                 try to follow your way with the custom impala
>>>>>                 build (DistributedPlanner.java is the only
>>>>>                 modified file at the moment). Thank you in advance
>>>>>                 for the file list!
>>>>>
>>>>>                 Best regards
>>>>>                 Philipp
>>>>>
>>>>>                 Alexander Behm <alex.behm@cloudera.com
>>>>>                 <mailto:alex.behm@cloudera.com>> schrieb am Fr.,
>>>>>                 13. Apr. 2018, 18:45:
>>>>>
>>>>>                     I'm not really following your
>>>>>                     installation/setup and am not an expert on
>>>>>                     Cloudera Manager installation/config. If you
>>>>>                     are going to build Impala anyway, it's
>>>>>                     probably easiest to test on Impala's
>>>>>                     minicluster first.
>>>>>
>>>>>                     In general, if you have a running Cloudera
>>>>>                     Managed cluster, you can deploy a custom
>>>>>                     Impala build by simply overwriting the Impala
>>>>>                     existing binaries and jars with the new build.
>>>>>                     If you want to go this route, I can give you a
>>>>>                     full list of files you need to replace.
>>>>>
>>>>>                     On Tue, Apr 10, 2018 at 11:44 AM, Philipp
>>>>>                     Krause <philippkrause.mail@googlemail.com
>>>>>                     <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>>
>>>>>                         Thank you for the explanation! Yes, I'm
>>>>>                         using HDFS. The single replica setup is
>>>>>                         only for test purposes at the moment. I
>>>>>                         think this makes it easier to gain some
>>>>>                         first results since less modifications
>>>>>                         (scheduler etc.) are neccessary.
>>>>>                         I would like to test the
>>>>>                         DistributedPlanner modification in my
>>>>>                         virtual cluster. I used a customized
>>>>>                         Vagrant script to install Impala on
>>>>>                         multiple hosts (s.attachment). It simply
>>>>>                         installs cloudera-manager-server-db,
>>>>>                         cloudera-manager-server and
>>>>>                         cloudera-manager-daemons via apt-get. What
>>>>>                         would be the simplest solution to setup my
>>>>>                         modified version? Could I simply call
>>>>>                         ./buildall.sh and change the script to
>>>>>                         sth. like this?
>>>>>
>>>>>                         echo "Install java..."
>>>>>                         apt-get -q -y --force-yes install
>>>>>                         oracle-j2sdk1.7
>>>>>                         echo "Download impala..."
>>>>>                         wget https://... where I uploaded my
>>>>>                         modified version
>>>>>                         echo "Extract impala..."
>>>>>                         tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>>                         cd Impala-cdh5-trunk
>>>>>                         echo "Build impala..."
>>>>>                         ./buildall.sh
>>>>>                         echo "Start impala instances..."
>>>>>                         service cloudera-scm-server-db initdb
>>>>>                         service cloudera-scm-server-db start
>>>>>                         service cloudera-scm-server start
>>>>>
>>>>>                         Or is there another, maybe even easier
>>>>>                         method, to test the code? Maybe via
>>>>>                         bootstrap_development.sh / minicluster?
>>>>>
>>>>>                         Best regards
>>>>>                         Philipp
>>>>>
>>>>>                         2018-04-05 18:39 GMT+02:00 Alexander Behm
>>>>>                         <alex.behm@cloudera.com
>>>>>                         <mailto:alex.behm@cloudera.com>>:
>>>>>
>>>>>                             Apologies for the late response. Btw,
>>>>>                             your previous post was clear enough to
>>>>>                             me, so no worries :)
>>>>>
>>>>>
>>>>>                             On Wed, Apr 4, 2018 at 7:46 AM,
>>>>>                             Philipp Krause
>>>>>                             <philippkrause.mail@googlemail.com
>>>>>                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>                             wrote:
>>>>>
>>>>>                                 Hello Alex,
>>>>>
>>>>>                                 I think my previous post has been
>>>>>                                 too long and confusing. I
>>>>>                                 apologize for that!
>>>>>
>>>>>                                 If replicas are completely
>>>>>                                 deactivated, all scan ranges of a
>>>>>                                 block are mapped to the one host,
>>>>>                                 where the block is located on.
>>>>>                                 This host is the "executor"/reader
>>>>>                                 for all the scan ranges of this
>>>>>                                 block. Is that correct?
>>>>>
>>>>>
>>>>>                             Yes, assuming you are using HDFS.
>>>>>
>>>>>
>>>>>                                 I tried to visualize my
>>>>>                                 understanding of the scan_range to
>>>>>                                 host mapping for my use case (s.
>>>>>                                 attachment). Could you please have
>>>>>                                 a quick look at it and tell me if
>>>>>                                 this is correct?
>>>>>
>>>>>                                 "The existing scan range
>>>>>                                 assignment is scan-node centric.
>>>>>                                 For each scan node, we
>>>>>                                 independently decide which of its
>>>>>                                 scan ranges should be processed by
>>>>>                                 which host."
>>>>>                                 Without replicas, all scan ranges
>>>>>                                 of a block would be assigend to
>>>>>                                 the same host where this block is
>>>>>                                 located on. Isn't everything local
>>>>>                                 here, so that Table_A - Block_0
>>>>>                                 and Table_B - Block_0 can be
>>>>>                                 joined local or are further steps
>>>>>                                 neccessary? The condition in the
>>>>>                                 DistributedPlanner you pointed to
>>>>>                                 me is set to false (no exchange
>>>>>                                 nodes).
>>>>>
>>>>>                                 "You want it to be host-centric.
>>>>>                                 For each host, collect the local
>>>>>                                 scan ranges of *all* scan nodes,
>>>>>                                 and assign them to that host."
>>>>>                                 Wouldn't the standard setup from
>>>>>                                 above work? Wouldn't I assign all
>>>>>                                 (the same) scan ranges to each
>>>>>                                 host in this case here?
>>>>>
>>>>>
>>>>>                             The standard setup works only in if
>>>>>                             every block only has exactly one
>>>>>                             replica. For our purposes, that is
>>>>>                             basically never the case (who would
>>>>>                             store production data without
>>>>>                             replication?), so the single-replica
>>>>>                             assumption was not clear to me.
>>>>>
>>>>>                             Does your current setup (only changing
>>>>>                             the planner and not the scheduler)
>>>>>                             produce the expected results?
>>>>>
>>>>>
>>>>>
>>>>>                                 Thank you very much!
>>>>>
>>>>>                                 Best regards
>>>>>                                 Philipp
>>>>>
>>>>>                                 2018-03-28 21:04 GMT+02:00 Philipp
>>>>>                                 Krause
>>>>>                                 <philippkrause.mail@googlemail.com
>>>>>                                 <mailto:philippkrause.mail@googlemail.com>>:
>>>>>
>>>>>                                     Thank you for your answer and
>>>>>                                     sorry for my delay!
>>>>>
>>>>>                                     If my understanding is
>>>>>                                     correct, the list of scan
>>>>>                                     nodes consists of all nodes
>>>>>                                     which contain a *local* block
>>>>>                                     from a table that is needed
>>>>>                                     for the query (Assumption: I
>>>>>                                     have no replicas in my first
>>>>>                                     tests). If TableA-Block0 is on
>>>>>                                     Node_0, isn't Node_0
>>>>>                                     automatically a scan node? And
>>>>>                                     wouldn't this scan node always
>>>>>                                     be the host for the complete
>>>>>                                     scan range(s) then?
>>>>>
>>>>>                                     "For each scan node, we
>>>>>                                     independently decide which of
>>>>>                                     its scan ranges should be
>>>>>                                     processed by which host."
>>>>>
>>>>>                                     https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532
>>>>>                                     <https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532>
>>>>>
>>>>>                                     // Loop over all scan ranges,
>>>>>                                     select an executor for those
>>>>>                                     with local impalads and
>>>>>                                     // collect all others for
>>>>>                                     later processing.
>>>>>
>>>>>                                     So in this whole block, scan
>>>>>                                     ranges are assigned to the
>>>>>                                     closest executor (=host?). But
>>>>>                                     isn't the closest executor
>>>>>                                     always the node the block is
>>>>>                                     located on (assumed impalad is
>>>>>                                     installed and I have no
>>>>>                                     replicas)? And isn't this node
>>>>>                                     always a scan node at the same
>>>>>                                     time? Otherwise a thread on a
>>>>>                                     remote host had to read the
>>>>>                                     corresponding scan range,
>>>>>                                     which would be more expensive.
>>>>>                                     The only exception I can think
>>>>>                                     of is when all threads on the
>>>>>                                     local node are busy. Or, if I
>>>>>                                     use replicas and all other
>>>>>                                     threads of my node with the
>>>>>                                     "original" block are busy, a
>>>>>                                     thread on another node which
>>>>>                                     contains a replica could read
>>>>>                                     a special scan range of its
>>>>>                                     local block. Is my
>>>>>                                     understanding correct here?
>>>>>
>>>>>                                     Aren't all scan ranges read
>>>>>                                     locally by its scan nodes if I
>>>>>                                     have impalad installed on all
>>>>>                                     nodes? And am I right, that
>>>>>                                     the scan range is only based
>>>>>                                     on its length which refers to
>>>>>                                     maxScanRangeLength in
>>>>>                                     computeScanRangeLocations?
>>>>>                                     https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>>>                                     <https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721>
>>>>>
>>>>>                                     I hope you can help me with
>>>>>                                     the scan node <-> scan
>>>>>                                     range->host relationship. If I
>>>>>                                     have Table_A-Block_0 and
>>>>>                                     Table_B_Block_0 on the same
>>>>>                                     node (which I want to join
>>>>>                                     locally), I don't get the
>>>>>                                     point of why scan ranges could
>>>>>                                     be assigned to another host in
>>>>>                                     my scenario.
>>>>>
>>>>>                                     Best regads and thank you very
>>>>>                                     much!
>>>>>                                     Philipp Krause
>>>>>
>>>>>
>>>>>                                     Am 21.03.2018 um 05:21 schrieb
>>>>>                                     Alexander Behm:
>>>>>>                                     Thanks for following up. I
>>>>>>                                     think I understand your setup.
>>>>>>
>>>>>>                                     If you want to not think
>>>>>>                                     about scan ranges, then you
>>>>>>                                     can modify
>>>>>>                                     HdfsScanNode.computeScanRangeLocations().
>>>>>>                                     For example, you could change
>>>>>>                                     it to produce one scan range
>>>>>>                                     per file or per HDFS block.
>>>>>>                                     That way you'd know exactly
>>>>>>                                     what a scan range corresponds to.
>>>>>>
>>>>>>                                     I think the easiest/fastest
>>>>>>                                     way for you to make progress
>>>>>>                                     is to re-implement the
>>>>>>                                     existing scan range
>>>>>>                                     assignment logic in that
>>>>>>                                     place in the code I had
>>>>>>                                     pointed you to. There is no
>>>>>>                                     quick fix to change the
>>>>>>                                     existing behavior.
>>>>>>                                     The existing scan range
>>>>>>                                     assignment is scan-node
>>>>>>                                     centric. For each scan node,
>>>>>>                                     we independently decide which
>>>>>>                                     of its scan ranges should be
>>>>>>                                     processed by which host.
>>>>>>
>>>>>>                                     I believe an algorithm to
>>>>>>                                     achieve your goal would look
>>>>>>                                     completely different. You
>>>>>>                                     want it to be host-centric.
>>>>>>                                     For each host, collect the
>>>>>>                                     local scan ranges of *all*
>>>>>>                                     scan nodes, and assign them
>>>>>>                                     to that host.
>>>>>>
>>>>>>                                     Does that make sense?
>>>>>>
>>>>>>                                     Alex
>>>>>>
>>>>>>
>>>>>>                                     On Mon, Mar 19, 2018 at 1:02
>>>>>>                                     PM, Philipp Krause
>>>>>>                                     <philippkrause.mail@googlemail.com
>>>>>>                                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>                                     wrote:
>>>>>>
>>>>>>                                         I'd like to provide a
>>>>>>                                         small example for our
>>>>>>                                         purpose. The last post
>>>>>>                                         may be a bit confusing,
>>>>>>                                         so here's a very simple
>>>>>>                                         example in the attached
>>>>>>                                         pdf file. I hope, it's
>>>>>>                                         understandable.
>>>>>>                                         Otherwise, please give me
>>>>>>                                         a short feedback.
>>>>>>
>>>>>>                                         Basically, I only want
>>>>>>                                         each data node to join
>>>>>>                                         all it's local blocks. Is
>>>>>>                                         there a range mapping
>>>>>>                                         needed or is it possible
>>>>>>                                         to easily join all local
>>>>>>                                         blocks (regardless of its
>>>>>>                                         content) since everything
>>>>>>                                         is already "prepared"?
>>>>>>                                         Maybe you can clarify
>>>>>>                                         this for me.
>>>>>>
>>>>>>                                         As you can see in the
>>>>>>                                         example, the tables are
>>>>>>                                         not partitioned by ID.
>>>>>>                                         The files are manually
>>>>>>                                         prepared by the help of
>>>>>>                                         the modulo function. So I
>>>>>>                                         don't have a range like
>>>>>>                                         [0,10], but something
>>>>>>                                         like 0,5,10,15 etc.
>>>>>>
>>>>>>                                         I hope, I didn't make it
>>>>>>                                         too complicated and
>>>>>>                                         confusing. I think, the
>>>>>>                                         actual idea behind this
>>>>>>                                         is really simple and I
>>>>>>                                         hope you can help me to
>>>>>>                                         get this working.
>>>>>>
>>>>>>                                         Best regards and thank
>>>>>>                                         you very much for your time!
>>>>>>                                         Philipp
>>>>>>
>>>>>>
>>>>>>                                         Am 18.03.2018 um 17:32
>>>>>>                                         schrieb Philipp Krause:
>>>>>>>
>>>>>>>                                         Hi! At the moment the
>>>>>>>                                         data to parquet (block)
>>>>>>>                                         mapping is based on a
>>>>>>>                                         simple modulo function:
>>>>>>>                                         Id % #data_nodes. So
>>>>>>>                                         with 5 data nodes all
>>>>>>>                                         rows with Id's
>>>>>>>                                         0,5,10,... are written
>>>>>>>                                         to Parquet_0, Id's 1,4,9
>>>>>>>                                         are written to Parquet_1
>>>>>>>                                         etc. That's what I did
>>>>>>>                                         manually. Since the
>>>>>>>                                         parquet file size and
>>>>>>>                                         the block size are both
>>>>>>>                                         set to 64MB, each
>>>>>>>                                         parquet file will result
>>>>>>>                                         in one block when I
>>>>>>>                                         transfer the parquet
>>>>>>>                                         files to HDFS. By
>>>>>>>                                         default, HDFS
>>>>>>>                                         distributes the blocks
>>>>>>>                                         randomly. For test
>>>>>>>                                         purposes I transferred
>>>>>>>                                         corresponding blocks
>>>>>>>                                         from Table_A and Table_B
>>>>>>>                                         to the same data node
>>>>>>>                                         (Table_A - Block_X with
>>>>>>>                                         Id's 0,5,10 and Table_B
>>>>>>>                                         - Block_Y with Id's
>>>>>>>                                         0,5,10). In this case,
>>>>>>>                                         they are transferred to
>>>>>>>                                         data_node_0 because the
>>>>>>>                                         modulo function (which I
>>>>>>>                                         want to implement in the
>>>>>>>                                         scheduler) returns 0 for
>>>>>>>                                         these Id's. This is also
>>>>>>>                                         done manually at the moment.
>>>>>>>
>>>>>>>                                         1.) DistributedPlanner:
>>>>>>>                                         For first, upcoming
>>>>>>>                                         tests I simply changed
>>>>>>>                                         the first condition in
>>>>>>>                                         the DistributedPlanner
>>>>>>>                                         to true to avoid
>>>>>>>                                         exchange nodes.
>>>>>>>
>>>>>>>                                         2.) The scheduler:
>>>>>>>                                         That's the part I'm
>>>>>>>                                         currently struggling
>>>>>>>                                         with. For first tests,
>>>>>>>                                         block replication is
>>>>>>>                                         deactivated. I'm not
>>>>>>>                                         sure how / where to
>>>>>>>                                         implement the modulo
>>>>>>>                                         function for scan range
>>>>>>>                                         to host mapping. Without
>>>>>>>                                         the modulo function, I
>>>>>>>                                         had to implement a hard
>>>>>>>                                         coded mapping (something
>>>>>>>                                         like "range" 0-0, 5-5,
>>>>>>>                                         10-10 -> Data_node_0
>>>>>>>                                         etc.). Is that correct?
>>>>>>>                                         Instead I would like to
>>>>>>>                                         use a slightly more
>>>>>>>                                         flexible solution by the
>>>>>>>                                         help of this modulo
>>>>>>>                                         function for the host
>>>>>>>                                         mapping.
>>>>>>>
>>>>>>>                                         I would be really
>>>>>>>                                         grateful if you could
>>>>>>>                                         give me a hint for the
>>>>>>>                                         scheduling
>>>>>>>                                         implementation. I try to
>>>>>>>                                         go deeper through the
>>>>>>>                                         code meanwhile.
>>>>>>>
>>>>>>>                                         Best regards and thank
>>>>>>>                                         you in advance
>>>>>>>                                         Philipp
>>>>>>>
>>>>>>>
>>>>>>>                                         Am 14.03.2018 um 08:06
>>>>>>>                                         schrieb Philipp Krause:
>>>>>>>>                                         Thank you very much for
>>>>>>>>                                         these information! I'll
>>>>>>>>                                         try to implement these
>>>>>>>>                                         two steps and post some
>>>>>>>>                                         updates within the next
>>>>>>>>                                         days!
>>>>>>>>
>>>>>>>>                                         Best regards
>>>>>>>>                                         Philipp
>>>>>>>>
>>>>>>>>                                         2018-03-13 5:38
>>>>>>>>                                         GMT+01:00 Alexander
>>>>>>>>                                         Behm
>>>>>>>>                                         <alex.behm@cloudera.com
>>>>>>>>                                         <mailto:alex.behm@cloudera.com>>:
>>>>>>>>
>>>>>>>>                                             Cool that you
>>>>>>>>                                             working on a
>>>>>>>>                                             research project
>>>>>>>>                                             with Impala!
>>>>>>>>
>>>>>>>>                                             Properly adding
>>>>>>>>                                             such a feature to
>>>>>>>>                                             Impala is a
>>>>>>>>                                             substantial effort,
>>>>>>>>                                             but hacking the
>>>>>>>>                                             code for an
>>>>>>>>                                             experiment or two
>>>>>>>>                                             seems doable.
>>>>>>>>
>>>>>>>>                                             I think you will
>>>>>>>>                                             need to modify two
>>>>>>>>                                             things: (1) the
>>>>>>>>                                             planner to not add
>>>>>>>>                                             exchange nodes, and
>>>>>>>>                                             (2) the scheduler
>>>>>>>>                                             to assign the
>>>>>>>>                                             co-located scan
>>>>>>>>                                             ranges to the same
>>>>>>>>                                             host.
>>>>>>>>
>>>>>>>>                                             Here are a few
>>>>>>>>                                             starting points in
>>>>>>>>                                             the code:
>>>>>>>>
>>>>>>>>                                             1) DistributedPlanner
>>>>>>>>                                             https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>>>                                             <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318>
>>>>>>>>
>>>>>>>>                                             The first condition
>>>>>>>>                                             handles the case
>>>>>>>>                                             where no exchange
>>>>>>>>                                             nodes need to be
>>>>>>>>                                             added because the
>>>>>>>>                                             join inputs are
>>>>>>>>                                             already suitably
>>>>>>>>                                             partitioned.
>>>>>>>>                                             You could hack the
>>>>>>>>                                             code to always go
>>>>>>>>                                             into that codepath,
>>>>>>>>                                             so no exchanges are
>>>>>>>>                                             added.
>>>>>>>>
>>>>>>>>                                             2) The scheduler
>>>>>>>>                                             https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226
>>>>>>>>                                             <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226>
>>>>>>>>
>>>>>>>>                                             You'll need to dig
>>>>>>>>                                             through and
>>>>>>>>                                             understand that
>>>>>>>>                                             code so that you
>>>>>>>>                                             can make the
>>>>>>>>                                             necessary changes.
>>>>>>>>                                             Change the scan
>>>>>>>>                                             range to host
>>>>>>>>                                             mapping to your
>>>>>>>>                                             liking. The rest of
>>>>>>>>                                             the code should
>>>>>>>>                                             just work.
>>>>>>>>
>>>>>>>>                                             Cheers,
>>>>>>>>
>>>>>>>>                                             Alex
>>>>>>>>
>>>>>>>>
>>>>>>>>                                             On Mon, Mar 12,
>>>>>>>>                                             2018 at 6:55 PM,
>>>>>>>>                                             Philipp Krause
>>>>>>>>                                             <philippkrause.mail@googlemail.com
>>>>>>>>                                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>>                                             wrote:
>>>>>>>>
>>>>>>>>                                                 Thank you very
>>>>>>>>                                                 much for your
>>>>>>>>                                                 quick answers!
>>>>>>>>                                                 The intention
>>>>>>>>                                                 behind this is
>>>>>>>>                                                 to improve the
>>>>>>>>                                                 execution time
>>>>>>>>                                                 and (primarily)
>>>>>>>>                                                 to examine the
>>>>>>>>                                                 impact of
>>>>>>>>                                                 block-co-location
>>>>>>>>                                                 (research
>>>>>>>>                                                 project) for
>>>>>>>>                                                 this particular
>>>>>>>>                                                 query
>>>>>>>>                                                 (simplified):
>>>>>>>>
>>>>>>>>                                                 select A.x,
>>>>>>>>                                                 B.y, A.z from
>>>>>>>>                                                 tableA as A
>>>>>>>>                                                 inner join
>>>>>>>>                                                 tableB as B on
>>>>>>>>                                                 A.id=B.id
>>>>>>>>
>>>>>>>>                                                 The "real"
>>>>>>>>                                                 query includes
>>>>>>>>                                                 three joins and
>>>>>>>>                                                 the data size
>>>>>>>>                                                 is in pb-range.
>>>>>>>>                                                 Therefore
>>>>>>>>                                                 several nodes
>>>>>>>>                                                 (5 in the test
>>>>>>>>                                                 environment
>>>>>>>>                                                 with less data)
>>>>>>>>                                                 are used
>>>>>>>>                                                 (without any
>>>>>>>>                                                 load balancer).
>>>>>>>>
>>>>>>>>                                                 Could you give
>>>>>>>>                                                 me some hints
>>>>>>>>                                                 what code
>>>>>>>>                                                 changes are
>>>>>>>>                                                 required and
>>>>>>>>                                                 which files are
>>>>>>>>                                                 affected? I
>>>>>>>>                                                 don't know how
>>>>>>>>                                                 to give Impala
>>>>>>>>                                                 the information
>>>>>>>>                                                 that it should
>>>>>>>>                                                 only join the
>>>>>>>>                                                 local data
>>>>>>>>                                                 blocks on each
>>>>>>>>                                                 node and then
>>>>>>>>                                                 pass it to the
>>>>>>>>                                                 "final" node
>>>>>>>>                                                 which receives
>>>>>>>>                                                 all
>>>>>>>>                                                 intermediate
>>>>>>>>                                                 results. I hope
>>>>>>>>                                                 you can help me
>>>>>>>>                                                 to get this
>>>>>>>>                                                 working. That
>>>>>>>>                                                 would be awesome!
>>>>>>>>
>>>>>>>>                                                 Best regards
>>>>>>>>                                                 Philipp
>>>>>>>>
>>>>>>>>                                                 Am 12.03.2018
>>>>>>>>                                                 um 18:38
>>>>>>>>                                                 schrieb
>>>>>>>>                                                 Alexander Behm:
>>>>>>>>>                                                 I suppose one
>>>>>>>>>                                                 exception is
>>>>>>>>>                                                 if your data
>>>>>>>>>                                                 lives only on
>>>>>>>>>                                                 a single node.
>>>>>>>>>                                                 Then you can
>>>>>>>>>                                                 set
>>>>>>>>>                                                 num_nodes=1
>>>>>>>>>                                                 and make sure
>>>>>>>>>                                                 to send the
>>>>>>>>>                                                 query request
>>>>>>>>>                                                 to the impalad
>>>>>>>>>                                                 running on the
>>>>>>>>>                                                 same data node
>>>>>>>>>                                                 as the target
>>>>>>>>>                                                 data. Then you
>>>>>>>>>                                                 should get a
>>>>>>>>>                                                 local join.
>>>>>>>>>
>>>>>>>>>                                                 On Mon, Mar
>>>>>>>>>                                                 12, 2018 at
>>>>>>>>>                                                 9:30 AM,
>>>>>>>>>                                                 Alexander Behm
>>>>>>>>>                                                 <alex.behm@cloudera.com
>>>>>>>>>                                                 <mailto:alex.behm@cloudera.com>>
>>>>>>>>>                                                 wrote:
>>>>>>>>>
>>>>>>>>>                                                     Such a
>>>>>>>>>                                                     specific
>>>>>>>>>                                                     block
>>>>>>>>>                                                     arrangement
>>>>>>>>>                                                     is very
>>>>>>>>>                                                     uncommon
>>>>>>>>>                                                     for
>>>>>>>>>                                                     typical
>>>>>>>>>                                                     Impala
>>>>>>>>>                                                     setups, so
>>>>>>>>>                                                     we don't
>>>>>>>>>                                                     attempt to
>>>>>>>>>                                                     recognize
>>>>>>>>>                                                     and
>>>>>>>>>                                                     optimize
>>>>>>>>>                                                     this
>>>>>>>>>                                                     narrow
>>>>>>>>>                                                     case. In
>>>>>>>>>                                                     particular,
>>>>>>>>>                                                     such an
>>>>>>>>>                                                     arrangement
>>>>>>>>>                                                     tends to
>>>>>>>>>                                                     be short
>>>>>>>>>                                                     lived if
>>>>>>>>>                                                     you have
>>>>>>>>>                                                     the HDFS
>>>>>>>>>                                                     balancer
>>>>>>>>>                                                     turned on.
>>>>>>>>>
>>>>>>>>>                                                     Without
>>>>>>>>>                                                     making
>>>>>>>>>                                                     code
>>>>>>>>>                                                     changes,
>>>>>>>>>                                                     there is
>>>>>>>>>                                                     no way
>>>>>>>>>                                                     today to
>>>>>>>>>                                                     remove the
>>>>>>>>>                                                     data
>>>>>>>>>                                                     exchanges
>>>>>>>>>                                                     and make
>>>>>>>>>                                                     sure that
>>>>>>>>>                                                     the
>>>>>>>>>                                                     scheduler
>>>>>>>>>                                                     assigns
>>>>>>>>>                                                     scan
>>>>>>>>>                                                     splits to
>>>>>>>>>                                                     nodes in
>>>>>>>>>                                                     the
>>>>>>>>>                                                     desired
>>>>>>>>>                                                     way
>>>>>>>>>                                                     (co-located,
>>>>>>>>>                                                     but with
>>>>>>>>>                                                     possible
>>>>>>>>>                                                     load
>>>>>>>>>                                                     imbalance).
>>>>>>>>>
>>>>>>>>>                                                     In what
>>>>>>>>>                                                     way is the
>>>>>>>>>                                                     current
>>>>>>>>>                                                     setup
>>>>>>>>>                                                     unacceptable
>>>>>>>>>                                                     to you? Is
>>>>>>>>>                                                     this
>>>>>>>>>                                                     pre-mature
>>>>>>>>>                                                     optimization?
>>>>>>>>>                                                     If you
>>>>>>>>>                                                     have
>>>>>>>>>                                                     certain
>>>>>>>>>                                                     performance
>>>>>>>>>                                                     expectations/requirements
>>>>>>>>>                                                     for
>>>>>>>>>                                                     specific
>>>>>>>>>                                                     queries we
>>>>>>>>>                                                     might be
>>>>>>>>>                                                     able to
>>>>>>>>>                                                     help you
>>>>>>>>>                                                     improve
>>>>>>>>>                                                     those. If
>>>>>>>>>                                                     you want
>>>>>>>>>                                                     to pursue
>>>>>>>>>                                                     this
>>>>>>>>>                                                     route,
>>>>>>>>>                                                     please
>>>>>>>>>                                                     help us by
>>>>>>>>>                                                     posting
>>>>>>>>>                                                     complete
>>>>>>>>>                                                     query
>>>>>>>>>                                                     profiles.
>>>>>>>>>
>>>>>>>>>                                                     Alex
>>>>>>>>>
>>>>>>>>>                                                     On Mon,
>>>>>>>>>                                                     Mar 12,
>>>>>>>>>                                                     2018 at
>>>>>>>>>                                                     6:29 AM,
>>>>>>>>>                                                     Philipp
>>>>>>>>>                                                     Krause
>>>>>>>>>                                                     <philippkrause.mail@googlemail.com
>>>>>>>>>                                                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>>>                                                     wrote:
>>>>>>>>>
>>>>>>>>>                                                         Hello
>>>>>>>>>                                                         everyone!
>>>>>>>>>
>>>>>>>>>                                                         In
>>>>>>>>>                                                         order
>>>>>>>>>                                                         to
>>>>>>>>>                                                         prevent
>>>>>>>>>                                                         network
>>>>>>>>>                                                         traffic,
>>>>>>>>>                                                         I'd
>>>>>>>>>                                                         like
>>>>>>>>>                                                         to
>>>>>>>>>                                                         perform
>>>>>>>>>                                                         local
>>>>>>>>>                                                         joins
>>>>>>>>>                                                         on
>>>>>>>>>                                                         each
>>>>>>>>>                                                         node
>>>>>>>>>                                                         instead
>>>>>>>>>                                                         of
>>>>>>>>>                                                         exchanging
>>>>>>>>>                                                         the
>>>>>>>>>                                                         data
>>>>>>>>>                                                         and
>>>>>>>>>                                                         perform
>>>>>>>>>                                                         a join
>>>>>>>>>                                                         over
>>>>>>>>>                                                         the
>>>>>>>>>                                                         complete
>>>>>>>>>                                                         data
>>>>>>>>>                                                         afterwards.
>>>>>>>>>                                                         My
>>>>>>>>>                                                         query
>>>>>>>>>                                                         is
>>>>>>>>>                                                         basically
>>>>>>>>>                                                         a join
>>>>>>>>>                                                         over
>>>>>>>>>                                                         three
>>>>>>>>>                                                         three
>>>>>>>>>                                                         tables
>>>>>>>>>                                                         on an
>>>>>>>>>                                                         ID
>>>>>>>>>                                                         attribute.
>>>>>>>>>                                                         The
>>>>>>>>>                                                         blocks
>>>>>>>>>                                                         are
>>>>>>>>>                                                         perfectly
>>>>>>>>>                                                         distributed,
>>>>>>>>>                                                         so
>>>>>>>>>                                                         that
>>>>>>>>>                                                         e.g.
>>>>>>>>>                                                         Table
>>>>>>>>>                                                         A -
>>>>>>>>>                                                         Block
>>>>>>>>>                                                         0  and
>>>>>>>>>                                                         Table
>>>>>>>>>                                                         B -
>>>>>>>>>                                                         Block
>>>>>>>>>                                                         0  are
>>>>>>>>>                                                         on the
>>>>>>>>>                                                         same
>>>>>>>>>                                                         node.
>>>>>>>>>                                                         These
>>>>>>>>>                                                         blocks
>>>>>>>>>                                                         contain
>>>>>>>>>                                                         all
>>>>>>>>>                                                         data
>>>>>>>>>                                                         rows
>>>>>>>>>                                                         with
>>>>>>>>>                                                         an ID
>>>>>>>>>                                                         range
>>>>>>>>>                                                         [0,1].
>>>>>>>>>                                                         Table
>>>>>>>>>                                                         A -
>>>>>>>>>                                                         Block
>>>>>>>>>                                                         1  and
>>>>>>>>>                                                         Table
>>>>>>>>>                                                         B -
>>>>>>>>>                                                         Block
>>>>>>>>>                                                         1 with
>>>>>>>>>                                                         an ID
>>>>>>>>>                                                         range
>>>>>>>>>                                                         [2,3]
>>>>>>>>>                                                         are on
>>>>>>>>>                                                         another
>>>>>>>>>                                                         node
>>>>>>>>>                                                         etc.
>>>>>>>>>                                                         So I
>>>>>>>>>                                                         want
>>>>>>>>>                                                         to
>>>>>>>>>                                                         perform
>>>>>>>>>                                                         a
>>>>>>>>>                                                         local
>>>>>>>>>                                                         join
>>>>>>>>>                                                         per
>>>>>>>>>                                                         node
>>>>>>>>>                                                         because
>>>>>>>>>                                                         any
>>>>>>>>>                                                         data
>>>>>>>>>                                                         exchange
>>>>>>>>>                                                         would
>>>>>>>>>                                                         be
>>>>>>>>>                                                         unneccessary
>>>>>>>>>                                                         (except
>>>>>>>>>                                                         for
>>>>>>>>>                                                         the
>>>>>>>>>                                                         last
>>>>>>>>>                                                         step
>>>>>>>>>                                                         when
>>>>>>>>>                                                         the
>>>>>>>>>                                                         final
>>>>>>>>>                                                         node
>>>>>>>>>                                                         recevieves
>>>>>>>>>                                                         all
>>>>>>>>>                                                         results
>>>>>>>>>                                                         of the
>>>>>>>>>                                                         other
>>>>>>>>>                                                         nodes).
>>>>>>>>>                                                         Is
>>>>>>>>>                                                         this
>>>>>>>>>                                                         possible?
>>>>>>>>>                                                         At the
>>>>>>>>>                                                         moment
>>>>>>>>>                                                         the
>>>>>>>>>                                                         query
>>>>>>>>>                                                         plan
>>>>>>>>>                                                         includes
>>>>>>>>>                                                         multiple
>>>>>>>>>                                                         data
>>>>>>>>>                                                         exchanges,
>>>>>>>>>                                                         although
>>>>>>>>>                                                         the
>>>>>>>>>                                                         blocks
>>>>>>>>>                                                         are
>>>>>>>>>                                                         already
>>>>>>>>>                                                         perfectly
>>>>>>>>>                                                         distributed
>>>>>>>>>                                                         (manually).
>>>>>>>>>                                                         I
>>>>>>>>>                                                         would
>>>>>>>>>                                                         be
>>>>>>>>>                                                         grateful
>>>>>>>>>                                                         for
>>>>>>>>>                                                         any help!
>>>>>>>>>
>>>>>>>>>                                                         Best
>>>>>>>>>                                                         regards
>>>>>>>>>                                                         Philipp
>>>>>>>>>                                                         Krause
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>


Mime
View raw message