impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <philippkrause.m...@googlemail.com>
Subject Re: Local join instead of data exchange - co-located blocks
Date Fri, 04 May 2018 13:40:57 GMT
Hi!

The query profile and the scan range mappings are attached 
(query_profile.txt + scan_ranges.txt). The complete log file is also 
attached. The mapping looks fine to me, I couldn't find any mistakes 
there. For example, line 168 (scan_ranges.txt) shows that partition ID=4 
is assigned to node_0 and partition ID=10 is assigned to node_1. Both 
partitions contain all id=4 rows which should be correct for the join. 
But probably I have overlooked something in the log.

The partition/block setup is as follows:
6 Nodes (1 Namenode, 5 Datanodes)
Node 1:
Node 2: 0|0 5|5
Node 3: 1|1
Node 4: 2|2
Node 5: 3|3
Node 6: 4|4

0|0 means partition_0 from table A and B.

Also thanks to Lars for the logging option, which I have used!

Best regards
Philipp


Am 04.05.2018 um 07:10 schrieb Lars Volker:
> I haven't followed this thread closely, but you can also print all 
> scan range assignments made by the scheduler by passing 
> -vmodule=scheduler=2 as a startup option. The logging happens in 
> scheduler.cc:612 
> <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L612> . 
>
>
> This wiki page has a way to achieve that using environment variables: 
> https://cwiki.apache.org/confluence/display/IMPALA/Useful+Tips+for+New+Impala+Developers
>
> Cheers, Lars
>
> On Thu, May 3, 2018 at 8:54 PM, Alexander Behm <alex.behm@cloudera.com 
> <mailto:alex.behm@cloudera.com>> wrote:
>
>     No idea what's going on, but my guess is something is awry with
>     the scan-range assignment. Can you attach the full profile? It's
>     probably also good to print the scan ranges created in
>     HdfsScanNode.computeScanRangeLocations().
>
>     On Thu, May 3, 2018 at 5:51 PM, Philipp Krause
>     <philippkrause.mail@googlemail.com
>     <mailto:philippkrause.mail@googlemail.com>> wrote:
>
>         Hello Alex,
>
>         I have tried out several configurations but I still couldn't
>         find a solution for my problem :( In the query summary (s.
>         attachment) it looks like as if no rows are read. Do you have
>         an idea what I have to change? I am sorry for the
>         circumstances and thank you once more for the great support to
>         get this working!
>
>
>         Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>>
>>         Hi Alex,
>>         I got the modified version working on my cluster. The query
>>         plan looks exactly as wanted (s. attachment). This is
>>         awesome! Unfortunately the result set is empty. As you can
>>         see in query_state.png, the scan progress always shows 50%
>>         although the query has finished.
>>
>>         The only modification in the code is the if statement you
>>         pointed to me (I set it to true). Maybe I have to give Impala
>>         the information about the lhs / rhs join partition since
>>         there are no exchange nodes now (like in the following
>>         lines)? The corresponding  partitions / blocks of each table
>>         are on the same node.
>>
>>         I think we are very close to the final result and I hope you
>>         can help me once more. Thank you so much!
>>
>>         Best regards
>>         Philipp
>>
>>
>>         Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>>>         On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause
>>>         <philippkrause.mail@googlemail.com
>>>         <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>
>>>             To prevent the broadcast join I could simply use the
>>>             shuffle operator in the query:
>>>
>>>             SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>>>             business_partition_2 WHERE
>>>             business_partition_1.businessid=business_partition_2.businessid
>>>
>>>
>>>         Not sure what version of Impala you are using, and whether
>>>         hints override any changes you might make. I suggest you
>>>         make the code work as you wish without requiring hints.
>>>
>>>
>>>             I think the broadcast is currently only used because of
>>>             my very small test tables.
>>>
>>>             This gives me the plan attached as
>>>             partitioned_shuffle.png. Since my modified version isn't
>>>             working yet, I partitioned both tables on businessid in
>>>             Impala. The "hack" should only help to get into the
>>>             if-condition if I partition the data manually, right?.
>>>             But in this case (if the partitioning is done by Impala
>>>             itself) Impala should get into the if-condition anyway.
>>>             Unfortunately I can't see a difference in the plan
>>>             compared to my unpartitioned tables (unpartitioned.png)
>>>             concerning the exchange nodes. My goal is actually to
>>>             get rid of all exchange nodes since the corresponding
>>>             data is already present on each node. Actually the plan
>>>             should look the same except for the 03 and 04 exchange then.
>>>
>>>
>>>         I understand the picture and goal. I suggest you read,
>>>         understand, and modify the code in
>>>         DistributedPlanner.createHashJoinFragment() to create the
>>>         plan shape that you want.
>>>         I don't know how you are producing these plans. Are you sure
>>>         your code changes are taking effect?
>>>
>>>
>>>             Are there other changes neccessary to just take
>>>             Table1-partition/block X and Table2-partition/block Y on
>>>             each node and join them without any data exchange?
>>>             Actually each node should take all its local blocks for
>>>             both tables, join them and pass the results back to the
>>>             coordinator where all results come together (please see
>>>             explanation.png).
>>>
>>>
>>>         No. You just need to create the plan without exchange nodes,
>>>         as we've already gone through early in this thread.
>>>
>>>
>>>             I'm looking forward hearing from you. I hope we can
>>>             realise this.
>>>
>>>             Best regards
>>>             Philipp
>>>
>>>
>>>             Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>>>             On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause
>>>>             <philippkrause.mail@googlemail.com
>>>>             <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>
>>>>                 Hi Alex,
>>>>                 thanks for the information! I've compiled the
>>>>                 cdh5.13.1-release and replaced
>>>>                 impala-frontend-0.1-SNAPSHOT.jar (which seems to
>>>>                 include the changes in the
>>>>                 DistributedPlanner.java). There still seems to to
>>>>                 be a method missing after replacing the jar but
>>>>                 I'll try to figure that out.
>>>>
>>>>                 I have two questions concerning the code fragment
>>>>                 in the DistributedPlanner.java you pointed to me.
>>>>
>>>>                 First:
>>>>                 The attached graphic shows the query plan for two
>>>>                 tables which are partitioned on the join attribute
>>>>                 (standard impala version without any changes). If I
>>>>                 change the if-condition to true for my modified
>>>>                 version I expect to get the same result for my
>>>>                 "hand made" partitions (outside of impala). But why
>>>>                 is there an exchange broadcast (even in the
>>>>                 standard version)? I mean, if I have a partition of
>>>>                 Table 1 with ID=0 on Node X why is there a
>>>>                 broadcast of the partition of Table 2 with ID=0 on
>>>>                 Node Y? Actually this partition only has to be sent
>>>>                 to Node X where the matching partition of Table 1
>>>>                 is located (instead of sending it to all nodes
>>>>                 (broadcast)). Or is this exactly the case and it's
>>>>                 only shown as a broadcast here in the graphics?
>>>>
>>>>
>>>>             You are reading the plan correctly. Impala simply does
>>>>             not implement that optimization.
>>>>
>>>>
>>>>                 Second:
>>>>                 All corresponding blocks of the partitioned tables
>>>>                 are on the same node (e.g. Table 1 Partition with
>>>>                 ID=0, Table 2 Partition with ID=0 => Node 1 etc.).
>>>>                 This is what I did manually. As already mentioned
>>>>                 before I want to join these partitions (blocks)
>>>>                 locally on each node. But if I'm correct, the
>>>>                 modification in the DistributedPlanner will also
>>>>                 only lead to the plan in the attached graphic so
>>>>                 that no further exchanges are created if I use my
>>>>                 "hand made" partitions. But there is still the
>>>>                 broadcast exchange which distributes the partitions
>>>>                 across the cluster which isn't neccessary because
>>>>                 all needed blocks are already on the same node and
>>>>                 are ready to get joined locally. Is there a way to
>>>>                 realise that and get rid of the broadcast exchange?
>>>>
>>>>             You are right. That hack in DistributedPlanner only
>>>>             works for partitioned hash joins. You can probably
>>>>             stitch the plan together at an earlier place, e.g.:
>>>>             https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506
>>>>             <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506>
>>>>
>>>>                 Please correct me if I'm wrong with my assumptions.
>>>>
>>>>                 Thank you very much!
>>>>
>>>>                 Best regards
>>>>                 Philipp
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>                 Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>>>                 Your CM-managed cluster must be running a
>>>>>                 "compatible" Impala version already for this trick
>>>>>                 to work. It looks like your catalog binary is
>>>>>                 trying to find a method which does not exist in
>>>>>                 the .jar, presumably because your .jar is built
>>>>>                 based on a different version of Impala where that
>>>>>                 method does not exist anymore.
>>>>>
>>>>>                 It looks like you have CDH 5.13.3 installed. CDH
>>>>>                 5.13.3 is based on Impala 2.10, see:
>>>>>                 https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513
>>>>>                 <https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513>
>>>>>
>>>>>                 That means this binary copying trick will only
>>>>>                 work with a modified version of Impala 2.10, and
>>>>>                 very likely will not work with a different version.
>>>>>
>>>>>                 It's probably easier to test with the mini cluster
>>>>>                 first. Alternatively, it "might" work if you
>>>>>                 replace all the binaries mentioned above, but it's
>>>>>                 quite possible that will not work.
>>>>>
>>>>>                 On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause
>>>>>                 <philippkrause.mail@googlemail.com
>>>>>                 <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>>
>>>>>                     Hi Alex! Thank you for the list! The build of
>>>>>                     the modified cdh5-trunk branch (debug mode)
>>>>>                     was sucessfull. After replacing
>>>>>                     "impala-frontend-0.1-SNAPSHOT.jar" in
>>>>>                     /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
>>>>>                     I got the following error in my existing cluster:
>>>>>                     F0416 01:16:45.402997 17897 catalog.cc:69]
>>>>>                     NoSuchMethodError: getCatalogObjects
>>>>>                     When I switch back to the original jar file
>>>>>                     the error is gone. So it must be something
>>>>>                     wrong with this file I guess. But I wonder
>>>>>                     about the error in catalog.cc because I didn't
>>>>>                     touch any .cc files.
>>>>>
>>>>>                     I also replaced
>>>>>                     "impala-data-source-api-1.0-SNAPSHOT.jar". The
>>>>>                     other jar files do not exist in my impala
>>>>>                     installation (CDH-5.13.1).
>>>>>
>>>>>                     What am I doing wrong?
>>>>>
>>>>>                     Best regards
>>>>>                     Philipp
>>>>>
>>>>>
>>>>>                     Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>>>                     Here's the foll list. It might not be
>>>>>>                     minimal, but copying/overwriting these should
>>>>>>                     work.
>>>>>>
>>>>>>                     debug/service/impalad
>>>>>>                     debug/service/libfesupport.so
>>>>>>                     debug/service/libService.a
>>>>>>                     release/service/impalad
>>>>>>                     release/service/libfesupport.so
>>>>>>                     release/service/libService.a
>>>>>>                     yarn-extras-0.1-SNAPSHOT.jar
>>>>>>                     impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>>>>                     impala-data-source-api-1.0-SNAPSHOT.jar
>>>>>>                     impala-frontend-0.1-SNAPSHOT-tests.jar
>>>>>>                     impala-frontend-0.1-SNAPSHOT.jar
>>>>>>                     libkudu_client.so.0.1.0
>>>>>>                     libstdc++.so.6.0.20
>>>>>>                     impala-no-sse.bc
>>>>>>                     impala-sse.bc
>>>>>>                     libimpalalzo.so
>>>>>>
>>>>>>                     If you are only modifying the Java portion
>>>>>>                     (like DistributedPlanner), then only
>>>>>>                     copying/replacing the *.jar files should be
>>>>>>                     sufficient.
>>>>>>
>>>>>>                     On Fri, Apr 13, 2018 at 11:00 AM, Philipp
>>>>>>                     Krause <philippkrause.mail@googlemail.com
>>>>>>                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>                     wrote:
>>>>>>
>>>>>>                         Yes, I have a running (virtual) cluster.
>>>>>>                         I would try to follow your way with the
>>>>>>                         custom impala build
>>>>>>                         (DistributedPlanner.java is the only
>>>>>>                         modified file at the moment). Thank you
>>>>>>                         in advance for the file list!
>>>>>>
>>>>>>                         Best regards
>>>>>>                         Philipp
>>>>>>
>>>>>>                         Alexander Behm <alex.behm@cloudera.com
>>>>>>                         <mailto:alex.behm@cloudera.com>> schrieb
>>>>>>                         am Fr., 13. Apr. 2018, 18:45:
>>>>>>
>>>>>>                             I'm not really following your
>>>>>>                             installation/setup and am not an
>>>>>>                             expert on Cloudera Manager
>>>>>>                             installation/config. If you are going
>>>>>>                             to build Impala anyway, it's probably
>>>>>>                             easiest to test on Impala's
>>>>>>                             minicluster first.
>>>>>>
>>>>>>                             In general, if you have a running
>>>>>>                             Cloudera Managed cluster, you can
>>>>>>                             deploy a custom Impala build by
>>>>>>                             simply overwriting the Impala
>>>>>>                             existing binaries and jars with the
>>>>>>                             new build. If you want to go this
>>>>>>                             route, I can give you a full list of
>>>>>>                             files you need to replace.
>>>>>>
>>>>>>                             On Tue, Apr 10, 2018 at 11:44 AM,
>>>>>>                             Philipp Krause
>>>>>>                             <philippkrause.mail@googlemail.com
>>>>>>                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>>                             wrote:
>>>>>>
>>>>>>                                 Thank you for the explanation!
>>>>>>                                 Yes, I'm using HDFS. The single
>>>>>>                                 replica setup is only for test
>>>>>>                                 purposes at the moment. I think
>>>>>>                                 this makes it easier to gain some
>>>>>>                                 first results since less
>>>>>>                                 modifications (scheduler etc.)
>>>>>>                                 are neccessary.
>>>>>>                                 I would like to test the
>>>>>>                                 DistributedPlanner modification
>>>>>>                                 in my virtual cluster. I used a
>>>>>>                                 customized Vagrant script to
>>>>>>                                 install Impala on multiple hosts
>>>>>>                                 (s.attachment). It simply
>>>>>>                                 installs
>>>>>>                                 cloudera-manager-server-db,
>>>>>>                                 cloudera-manager-server and
>>>>>>                                 cloudera-manager-daemons via
>>>>>>                                 apt-get. What would be the
>>>>>>                                 simplest solution to setup my
>>>>>>                                 modified version? Could I simply
>>>>>>                                 call ./buildall.sh and change the
>>>>>>                                 script to sth. like this?
>>>>>>
>>>>>>                                 echo "Install java..."
>>>>>>                                 apt-get -q -y --force-yes install
>>>>>>                                 oracle-j2sdk1.7
>>>>>>                                 echo "Download impala..."
>>>>>>                                 wget https://... where I uploaded
>>>>>>                                 my modified version
>>>>>>                                 echo "Extract impala..."
>>>>>>                                 tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>>>                                 cd Impala-cdh5-trunk
>>>>>>                                 echo "Build impala..."
>>>>>>                                 ./buildall.sh
>>>>>>                                 echo "Start impala instances..."
>>>>>>                                 service cloudera-scm-server-db initdb
>>>>>>                                 service cloudera-scm-server-db start
>>>>>>                                 service cloudera-scm-server start
>>>>>>
>>>>>>                                 Or is there another, maybe even
>>>>>>                                 easier method, to test the code?
>>>>>>                                 Maybe via
>>>>>>                                 bootstrap_development.sh /
>>>>>>                                 minicluster?
>>>>>>
>>>>>>                                 Best regards
>>>>>>                                 Philipp
>>>>>>
>>>>>>                                 2018-04-05 18:39 GMT+02:00
>>>>>>                                 Alexander Behm
>>>>>>                                 <alex.behm@cloudera.com
>>>>>>                                 <mailto:alex.behm@cloudera.com>>:
>>>>>>
>>>>>>                                     Apologies for the late
>>>>>>                                     response. Btw, your previous
>>>>>>                                     post was clear enough to me,
>>>>>>                                     so no worries :)
>>>>>>
>>>>>>
>>>>>>                                     On Wed, Apr 4, 2018 at 7:46
>>>>>>                                     AM, Philipp Krause
>>>>>>                                     <philippkrause.mail@googlemail.com
>>>>>>                                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>                                     wrote:
>>>>>>
>>>>>>                                         Hello Alex,
>>>>>>
>>>>>>                                         I think my previous post
>>>>>>                                         has been too long and
>>>>>>                                         confusing. I apologize
>>>>>>                                         for that!
>>>>>>
>>>>>>                                         If replicas are
>>>>>>                                         completely deactivated,
>>>>>>                                         all scan ranges of a
>>>>>>                                         block are mapped to the
>>>>>>                                         one host, where the block
>>>>>>                                         is located on. This host
>>>>>>                                         is the "executor"/reader
>>>>>>                                         for all the scan ranges
>>>>>>                                         of this block. Is that
>>>>>>                                         correct?
>>>>>>
>>>>>>
>>>>>>                                     Yes, assuming you are using HDFS.
>>>>>>
>>>>>>
>>>>>>                                         I tried to visualize my
>>>>>>                                         understanding of the
>>>>>>                                         scan_range to host
>>>>>>                                         mapping for my use case
>>>>>>                                         (s. attachment). Could
>>>>>>                                         you please have a quick
>>>>>>                                         look at it and tell me if
>>>>>>                                         this is correct?
>>>>>>
>>>>>>                                         "The existing scan range
>>>>>>                                         assignment is scan-node
>>>>>>                                         centric. For each scan
>>>>>>                                         node, we independently
>>>>>>                                         decide which of its scan
>>>>>>                                         ranges should be
>>>>>>                                         processed by which host."
>>>>>>                                         Without replicas, all
>>>>>>                                         scan ranges of a block
>>>>>>                                         would be assigend to the
>>>>>>                                         same host where this
>>>>>>                                         block is located on.
>>>>>>                                         Isn't everything local
>>>>>>                                         here, so that Table_A -
>>>>>>                                         Block_0 and Table_B -
>>>>>>                                         Block_0 can be joined
>>>>>>                                         local or are further
>>>>>>                                         steps neccessary? The
>>>>>>                                         condition in the
>>>>>>                                         DistributedPlanner you
>>>>>>                                         pointed to me is set to
>>>>>>                                         false (no exchange nodes).
>>>>>>
>>>>>>                                         "You want it to be
>>>>>>                                         host-centric. For each
>>>>>>                                         host, collect the local
>>>>>>                                         scan ranges of *all* scan
>>>>>>                                         nodes, and assign them to
>>>>>>                                         that host."
>>>>>>                                         Wouldn't the standard
>>>>>>                                         setup from above work?
>>>>>>                                         Wouldn't I assign all
>>>>>>                                         (the same) scan ranges to
>>>>>>                                         each host in this case here?
>>>>>>
>>>>>>
>>>>>>                                     The standard setup works only
>>>>>>                                     in if every block only has
>>>>>>                                     exactly one replica. For our
>>>>>>                                     purposes, that is basically
>>>>>>                                     never the case (who would
>>>>>>                                     store production data without
>>>>>>                                     replication?), so the
>>>>>>                                     single-replica assumption was
>>>>>>                                     not clear to me.
>>>>>>
>>>>>>                                     Does your current setup (only
>>>>>>                                     changing the planner and not
>>>>>>                                     the scheduler) produce the
>>>>>>                                     expected results?
>>>>>>
>>>>>>
>>>>>>
>>>>>>                                         Thank you very much!
>>>>>>
>>>>>>                                         Best regards
>>>>>>                                         Philipp
>>>>>>
>>>>>>                                         2018-03-28 21:04
>>>>>>                                         GMT+02:00 Philipp Krause
>>>>>>                                         <philippkrause.mail@googlemail.com
>>>>>>                                         <mailto:philippkrause.mail@googlemail.com>>:
>>>>>>
>>>>>>                                             Thank you for your
>>>>>>                                             answer and sorry for
>>>>>>                                             my delay!
>>>>>>
>>>>>>                                             If my understanding
>>>>>>                                             is correct, the list
>>>>>>                                             of scan nodes
>>>>>>                                             consists of all nodes
>>>>>>                                             which contain a
>>>>>>                                             *local* block from a
>>>>>>                                             table that is needed
>>>>>>                                             for the query
>>>>>>                                             (Assumption: I have
>>>>>>                                             no replicas in my
>>>>>>                                             first tests). If
>>>>>>                                             TableA-Block0 is on
>>>>>>                                             Node_0, isn't Node_0
>>>>>>                                             automatically a scan
>>>>>>                                             node? And wouldn't
>>>>>>                                             this scan node always
>>>>>>                                             be the host for the
>>>>>>                                             complete scan
>>>>>>                                             range(s) then?
>>>>>>
>>>>>>                                             "For each scan node,
>>>>>>                                             we independently
>>>>>>                                             decide which of its
>>>>>>                                             scan ranges should be
>>>>>>                                             processed by which host."
>>>>>>
>>>>>>                                             https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532
>>>>>>                                             <https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532>
>>>>>>
>>>>>>                                             // Loop over all scan
>>>>>>                                             ranges, select an
>>>>>>                                             executor for those
>>>>>>                                             with local impalads and
>>>>>>                                             // collect all others
>>>>>>                                             for later processing.
>>>>>>
>>>>>>                                             So in this whole
>>>>>>                                             block, scan ranges
>>>>>>                                             are assigned to the
>>>>>>                                             closest executor
>>>>>>                                             (=host?). But isn't
>>>>>>                                             the closest executor
>>>>>>                                             always the node the
>>>>>>                                             block is located on
>>>>>>                                             (assumed impalad is
>>>>>>                                             installed and I have
>>>>>>                                             no replicas)? And
>>>>>>                                             isn't this node
>>>>>>                                             always a scan node at
>>>>>>                                             the same time?
>>>>>>                                             Otherwise a thread on
>>>>>>                                             a remote host had to
>>>>>>                                             read the
>>>>>>                                             corresponding scan
>>>>>>                                             range, which would be
>>>>>>                                             more expensive. The
>>>>>>                                             only exception I can
>>>>>>                                             think of is when all
>>>>>>                                             threads on the local
>>>>>>                                             node are busy. Or, if
>>>>>>                                             I use replicas and
>>>>>>                                             all other threads of
>>>>>>                                             my node with the
>>>>>>                                             "original" block are
>>>>>>                                             busy, a thread on
>>>>>>                                             another node which
>>>>>>                                             contains a replica
>>>>>>                                             could read a special
>>>>>>                                             scan range of its
>>>>>>                                             local block. Is my
>>>>>>                                             understanding correct
>>>>>>                                             here?
>>>>>>
>>>>>>                                             Aren't all scan
>>>>>>                                             ranges read locally
>>>>>>                                             by its scan nodes if
>>>>>>                                             I have impalad
>>>>>>                                             installed on all
>>>>>>                                             nodes? And am I
>>>>>>                                             right, that the scan
>>>>>>                                             range is only based
>>>>>>                                             on its length which
>>>>>>                                             refers to
>>>>>>                                             maxScanRangeLength in
>>>>>>                                             computeScanRangeLocations?
>>>>>>                                             https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>>>>                                             <https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721>
>>>>>>
>>>>>>                                             I hope you can help
>>>>>>                                             me with the scan node
>>>>>>                                             <-> scan range->host
>>>>>>                                             relationship. If I
>>>>>>                                             have Table_A-Block_0
>>>>>>                                             and Table_B_Block_0
>>>>>>                                             on the same node
>>>>>>                                             (which I want to join
>>>>>>                                             locally), I don't get
>>>>>>                                             the point of why scan
>>>>>>                                             ranges could be
>>>>>>                                             assigned to another
>>>>>>                                             host in my scenario.
>>>>>>
>>>>>>                                             Best regads and thank
>>>>>>                                             you very much!
>>>>>>                                             Philipp Krause
>>>>>>
>>>>>>
>>>>>>                                             Am 21.03.2018 um
>>>>>>                                             05:21 schrieb
>>>>>>                                             Alexander Behm:
>>>>>>>                                             Thanks for following
>>>>>>>                                             up. I think I
>>>>>>>                                             understand your setup.
>>>>>>>
>>>>>>>                                             If you want to not
>>>>>>>                                             think about scan
>>>>>>>                                             ranges, then you can
>>>>>>>                                             modify
>>>>>>>                                             HdfsScanNode.computeScanRangeLocations().
>>>>>>>                                             For example, you
>>>>>>>                                             could change it to
>>>>>>>                                             produce one scan
>>>>>>>                                             range per file or
>>>>>>>                                             per HDFS block. That
>>>>>>>                                             way you'd know
>>>>>>>                                             exactly what a scan
>>>>>>>                                             range corresponds to.
>>>>>>>
>>>>>>>                                             I think the
>>>>>>>                                             easiest/fastest way
>>>>>>>                                             for you to make
>>>>>>>                                             progress is to
>>>>>>>                                             re-implement the
>>>>>>>                                             existing scan range
>>>>>>>                                             assignment logic in
>>>>>>>                                             that place in the
>>>>>>>                                             code I had pointed
>>>>>>>                                             you to. There is no
>>>>>>>                                             quick fix to change
>>>>>>>                                             the existing behavior.
>>>>>>>                                             The existing scan
>>>>>>>                                             range assignment is
>>>>>>>                                             scan-node centric.
>>>>>>>                                             For each scan node,
>>>>>>>                                             we independently
>>>>>>>                                             decide which of its
>>>>>>>                                             scan ranges should
>>>>>>>                                             be processed by
>>>>>>>                                             which host.
>>>>>>>
>>>>>>>                                             I believe an
>>>>>>>                                             algorithm to achieve
>>>>>>>                                             your goal would look
>>>>>>>                                             completely
>>>>>>>                                             different. You want
>>>>>>>                                             it to be
>>>>>>>                                             host-centric. For
>>>>>>>                                             each host, collect
>>>>>>>                                             the local scan
>>>>>>>                                             ranges of *all* scan
>>>>>>>                                             nodes, and assign
>>>>>>>                                             them to that host.
>>>>>>>
>>>>>>>                                             Does that make sense?
>>>>>>>
>>>>>>>                                             Alex
>>>>>>>
>>>>>>>
>>>>>>>                                             On Mon, Mar 19, 2018
>>>>>>>                                             at 1:02 PM, Philipp
>>>>>>>                                             Krause
>>>>>>>                                             <philippkrause.mail@googlemail.com
>>>>>>>                                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>                                             wrote:
>>>>>>>
>>>>>>>                                                 I'd like to
>>>>>>>                                                 provide a small
>>>>>>>                                                 example for our
>>>>>>>                                                 purpose. The
>>>>>>>                                                 last post may be
>>>>>>>                                                 a bit confusing,
>>>>>>>                                                 so here's a very
>>>>>>>                                                 simple example
>>>>>>>                                                 in the attached
>>>>>>>                                                 pdf file. I
>>>>>>>                                                 hope, it's
>>>>>>>                                                 understandable.
>>>>>>>                                                 Otherwise,
>>>>>>>                                                 please give me a
>>>>>>>                                                 short feedback.
>>>>>>>
>>>>>>>                                                 Basically, I
>>>>>>>                                                 only want each
>>>>>>>                                                 data node to
>>>>>>>                                                 join all it's
>>>>>>>                                                 local blocks. Is
>>>>>>>                                                 there a range
>>>>>>>                                                 mapping needed
>>>>>>>                                                 or is it
>>>>>>>                                                 possible to
>>>>>>>                                                 easily join all
>>>>>>>                                                 local blocks
>>>>>>>                                                 (regardless of
>>>>>>>                                                 its content)
>>>>>>>                                                 since everything
>>>>>>>                                                 is already
>>>>>>>                                                 "prepared"?
>>>>>>>                                                 Maybe you can
>>>>>>>                                                 clarify this for me.
>>>>>>>
>>>>>>>                                                 As you can see
>>>>>>>                                                 in the example,
>>>>>>>                                                 the tables are
>>>>>>>                                                 not partitioned
>>>>>>>                                                 by ID. The files
>>>>>>>                                                 are manually
>>>>>>>                                                 prepared by the
>>>>>>>                                                 help of the
>>>>>>>                                                 modulo function.
>>>>>>>                                                 So I don't have
>>>>>>>                                                 a range like
>>>>>>>                                                 [0,10], but
>>>>>>>                                                 something like
>>>>>>>                                                 0,5,10,15 etc.
>>>>>>>
>>>>>>>                                                 I hope, I didn't
>>>>>>>                                                 make it too
>>>>>>>                                                 complicated and
>>>>>>>                                                 confusing. I
>>>>>>>                                                 think, the
>>>>>>>                                                 actual idea
>>>>>>>                                                 behind this is
>>>>>>>                                                 really simple
>>>>>>>                                                 and I hope you
>>>>>>>                                                 can help me to
>>>>>>>                                                 get this working.
>>>>>>>
>>>>>>>                                                 Best regards and
>>>>>>>                                                 thank you very
>>>>>>>                                                 much for your time!
>>>>>>>                                                 Philipp
>>>>>>>
>>>>>>>
>>>>>>>                                                 Am 18.03.2018 um
>>>>>>>                                                 17:32 schrieb
>>>>>>>                                                 Philipp Krause:
>>>>>>>>
>>>>>>>>                                                 Hi! At the
>>>>>>>>                                                 moment the data
>>>>>>>>                                                 to parquet
>>>>>>>>                                                 (block) mapping
>>>>>>>>                                                 is based on a
>>>>>>>>                                                 simple modulo
>>>>>>>>                                                 function: Id %
>>>>>>>>                                                 #data_nodes. So
>>>>>>>>                                                 with 5 data
>>>>>>>>                                                 nodes all rows
>>>>>>>>                                                 with Id's
>>>>>>>>                                                 0,5,10,... are
>>>>>>>>                                                 written to
>>>>>>>>                                                 Parquet_0, Id's
>>>>>>>>                                                 1,4,9 are
>>>>>>>>                                                 written to
>>>>>>>>                                                 Parquet_1 etc.
>>>>>>>>                                                 That's what I
>>>>>>>>                                                 did manually.
>>>>>>>>                                                 Since the
>>>>>>>>                                                 parquet file
>>>>>>>>                                                 size and the
>>>>>>>>                                                 block size are
>>>>>>>>                                                 both set to
>>>>>>>>                                                 64MB, each
>>>>>>>>                                                 parquet file
>>>>>>>>                                                 will result in
>>>>>>>>                                                 one block when
>>>>>>>>                                                 I transfer the
>>>>>>>>                                                 parquet files
>>>>>>>>                                                 to HDFS. By
>>>>>>>>                                                 default, HDFS
>>>>>>>>                                                 distributes the
>>>>>>>>                                                 blocks
>>>>>>>>                                                 randomly. For
>>>>>>>>                                                 test purposes I
>>>>>>>>                                                 transferred
>>>>>>>>                                                 corresponding
>>>>>>>>                                                 blocks from
>>>>>>>>                                                 Table_A and
>>>>>>>>                                                 Table_B to the
>>>>>>>>                                                 same data node
>>>>>>>>                                                 (Table_A -
>>>>>>>>                                                 Block_X with
>>>>>>>>                                                 Id's 0,5,10 and
>>>>>>>>                                                 Table_B -
>>>>>>>>                                                 Block_Y with
>>>>>>>>                                                 Id's 0,5,10).
>>>>>>>>                                                 In this case,
>>>>>>>>                                                 they are
>>>>>>>>                                                 transferred to
>>>>>>>>                                                 data_node_0
>>>>>>>>                                                 because the
>>>>>>>>                                                 modulo function
>>>>>>>>                                                 (which I want
>>>>>>>>                                                 to implement in
>>>>>>>>                                                 the scheduler)
>>>>>>>>                                                 returns 0 for
>>>>>>>>                                                 these Id's.
>>>>>>>>                                                 This is also
>>>>>>>>                                                 done manually
>>>>>>>>                                                 at the moment.
>>>>>>>>
>>>>>>>>                                                 1.)
>>>>>>>>                                                 DistributedPlanner:
>>>>>>>>                                                 For first,
>>>>>>>>                                                 upcoming tests
>>>>>>>>                                                 I simply
>>>>>>>>                                                 changed the
>>>>>>>>                                                 first condition
>>>>>>>>                                                 in the
>>>>>>>>                                                 DistributedPlanner
>>>>>>>>                                                 to true to
>>>>>>>>                                                 avoid exchange
>>>>>>>>                                                 nodes.
>>>>>>>>
>>>>>>>>                                                 2.) The
>>>>>>>>                                                 scheduler:
>>>>>>>>                                                 That's the part
>>>>>>>>                                                 I'm currently
>>>>>>>>                                                 struggling
>>>>>>>>                                                 with. For first
>>>>>>>>                                                 tests, block
>>>>>>>>                                                 replication is
>>>>>>>>                                                 deactivated.
>>>>>>>>                                                 I'm not sure
>>>>>>>>                                                 how / where to
>>>>>>>>                                                 implement the
>>>>>>>>                                                 modulo function
>>>>>>>>                                                 for scan range
>>>>>>>>                                                 to host
>>>>>>>>                                                 mapping.
>>>>>>>>                                                 Without the
>>>>>>>>                                                 modulo
>>>>>>>>                                                 function, I had
>>>>>>>>                                                 to implement a
>>>>>>>>                                                 hard coded
>>>>>>>>                                                 mapping
>>>>>>>>                                                 (something like
>>>>>>>>                                                 "range" 0-0,
>>>>>>>>                                                 5-5, 10-10 ->
>>>>>>>>                                                 Data_node_0
>>>>>>>>                                                 etc.). Is that
>>>>>>>>                                                 correct?
>>>>>>>>                                                 Instead I would
>>>>>>>>                                                 like to use a
>>>>>>>>                                                 slightly more
>>>>>>>>                                                 flexible
>>>>>>>>                                                 solution by the
>>>>>>>>                                                 help of this
>>>>>>>>                                                 modulo function
>>>>>>>>                                                 for the host
>>>>>>>>                                                 mapping.
>>>>>>>>
>>>>>>>>                                                 I would be
>>>>>>>>                                                 really grateful
>>>>>>>>                                                 if you could
>>>>>>>>                                                 give me a hint
>>>>>>>>                                                 for the
>>>>>>>>                                                 scheduling
>>>>>>>>                                                 implementation.
>>>>>>>>                                                 I try to go
>>>>>>>>                                                 deeper through
>>>>>>>>                                                 the code meanwhile.
>>>>>>>>
>>>>>>>>                                                 Best regards
>>>>>>>>                                                 and thank you
>>>>>>>>                                                 in advance
>>>>>>>>                                                 Philipp
>>>>>>>>
>>>>>>>>
>>>>>>>>                                                 Am 14.03.2018
>>>>>>>>                                                 um 08:06
>>>>>>>>                                                 schrieb Philipp
>>>>>>>>                                                 Krause:
>>>>>>>>>                                                 Thank you very
>>>>>>>>>                                                 much for these
>>>>>>>>>                                                 information!
>>>>>>>>>                                                 I'll try to
>>>>>>>>>                                                 implement
>>>>>>>>>                                                 these two
>>>>>>>>>                                                 steps and post
>>>>>>>>>                                                 some updates
>>>>>>>>>                                                 within the
>>>>>>>>>                                                 next days!
>>>>>>>>>
>>>>>>>>>                                                 Best regards
>>>>>>>>>                                                 Philipp
>>>>>>>>>
>>>>>>>>>                                                 2018-03-13
>>>>>>>>>                                                 5:38 GMT+01:00
>>>>>>>>>                                                 Alexander Behm
>>>>>>>>>                                                 <alex.behm@cloudera.com
>>>>>>>>>                                                 <mailto:alex.behm@cloudera.com>>:
>>>>>>>>>
>>>>>>>>>                                                     Cool that
>>>>>>>>>                                                     you
>>>>>>>>>                                                     working on
>>>>>>>>>                                                     a research
>>>>>>>>>                                                     project
>>>>>>>>>                                                     with Impala!
>>>>>>>>>
>>>>>>>>>                                                     Properly
>>>>>>>>>                                                     adding
>>>>>>>>>                                                     such a
>>>>>>>>>                                                     feature to
>>>>>>>>>                                                     Impala is
>>>>>>>>>                                                     a
>>>>>>>>>                                                     substantial
>>>>>>>>>                                                     effort,
>>>>>>>>>                                                     but
>>>>>>>>>                                                     hacking
>>>>>>>>>                                                     the code
>>>>>>>>>                                                     for an
>>>>>>>>>                                                     experiment
>>>>>>>>>                                                     or two
>>>>>>>>>                                                     seems doable.
>>>>>>>>>
>>>>>>>>>                                                     I think
>>>>>>>>>                                                     you will
>>>>>>>>>                                                     need to
>>>>>>>>>                                                     modify two
>>>>>>>>>                                                     things:
>>>>>>>>>                                                     (1) the
>>>>>>>>>                                                     planner to
>>>>>>>>>                                                     not add
>>>>>>>>>                                                     exchange
>>>>>>>>>                                                     nodes, and
>>>>>>>>>                                                     (2) the
>>>>>>>>>                                                     scheduler
>>>>>>>>>                                                     to assign
>>>>>>>>>                                                     the
>>>>>>>>>                                                     co-located
>>>>>>>>>                                                     scan
>>>>>>>>>                                                     ranges to
>>>>>>>>>                                                     the same host.
>>>>>>>>>
>>>>>>>>>                                                     Here are a
>>>>>>>>>                                                     few
>>>>>>>>>                                                     starting
>>>>>>>>>                                                     points in
>>>>>>>>>                                                     the code:
>>>>>>>>>
>>>>>>>>>                                                     1)
>>>>>>>>>                                                     DistributedPlanner
>>>>>>>>>                                                     https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>>>>                                                     <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318>
>>>>>>>>>
>>>>>>>>>                                                     The first
>>>>>>>>>                                                     condition
>>>>>>>>>                                                     handles
>>>>>>>>>                                                     the case
>>>>>>>>>                                                     where no
>>>>>>>>>                                                     exchange
>>>>>>>>>                                                     nodes need
>>>>>>>>>                                                     to be
>>>>>>>>>                                                     added
>>>>>>>>>                                                     because
>>>>>>>>>                                                     the join
>>>>>>>>>                                                     inputs are
>>>>>>>>>                                                     already
>>>>>>>>>                                                     suitably
>>>>>>>>>                                                     partitioned.
>>>>>>>>>                                                     You could
>>>>>>>>>                                                     hack the
>>>>>>>>>                                                     code to
>>>>>>>>>                                                     always go
>>>>>>>>>                                                     into that
>>>>>>>>>                                                     codepath,
>>>>>>>>>                                                     so no
>>>>>>>>>                                                     exchanges
>>>>>>>>>                                                     are added.
>>>>>>>>>
>>>>>>>>>                                                     2) The
>>>>>>>>>                                                     scheduler
>>>>>>>>>                                                     https://github.com/apa
>>>>>>>>>                                                     <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226>
>>>>>>>>>
>     ...
>
>     [Message clipped] 
>
>


Mime
View raw message