impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <philippkrause.m...@googlemail.com>
Subject Re: Local join instead of data exchange - co-located blocks
Date Mon, 07 May 2018 11:00:48 GMT
I just wanted to add, that I tried the join with two other, minimal and 
"fresh" tables. All blocks from both tables were on the same node but I 
got the same result that no data were processed. To me, the scan range 
mapping of my modified version looks the same compared to the original 
one. I only noticed a difference in the query profile:
Filter 0 (1.00 MB):
              - Files processed: 1 (1)
              - Files rejected: 1 (1)
...

This filter only appears in my modified version. Hopefully we can find 
the mistake.


Am 04.05.2018 um 15:40 schrieb Philipp Krause:
>
> Hi!
>
> The query profile and the scan range mappings are attached 
> (query_profile.txt + scan_ranges.txt). The complete log file is also 
> attached. The mapping looks fine to me, I couldn't find any mistakes 
> there. For example, line 168 (scan_ranges.txt) shows that partition 
> ID=4 is assigned to node_0 and partition ID=10 is assigned to node_1. 
> Both partitions contain all id=4 rows which should be correct for the 
> join. But probably I have overlooked something in the log.
>
> The partition/block setup is as follows:
> 6 Nodes (1 Namenode, 5 Datanodes)
> Node 1:
> Node 2: 0|0 5|5
> Node 3: 1|1
> Node 4: 2|2
> Node 5: 3|3
> Node 6: 4|4
>
> 0|0 means partition_0 from table A and B.
>
> Also thanks to Lars for the logging option, which I have used!
>
> Best regards
> Philipp
>
>
> Am 04.05.2018 um 07:10 schrieb Lars Volker:
>> I haven't followed this thread closely, but you can also print all 
>> scan range assignments made by the scheduler by passing 
>> -vmodule=scheduler=2 as a startup option. The logging happens in 
>> scheduler.cc:612 
>> <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L612> . 
>>
>>
>> This wiki page has a way to achieve that using environment variables: 
>> https://cwiki.apache.org/confluence/display/IMPALA/Useful+Tips+for+New+Impala+Developers
>>
>> Cheers, Lars
>>
>> On Thu, May 3, 2018 at 8:54 PM, Alexander Behm 
>> <alex.behm@cloudera.com <mailto:alex.behm@cloudera.com>> wrote:
>>
>>     No idea what's going on, but my guess is something is awry with
>>     the scan-range assignment. Can you attach the full profile? It's
>>     probably also good to print the scan ranges created in
>>     HdfsScanNode.computeScanRangeLocations().
>>
>>     On Thu, May 3, 2018 at 5:51 PM, Philipp Krause
>>     <philippkrause.mail@googlemail.com
>>     <mailto:philippkrause.mail@googlemail.com>> wrote:
>>
>>         Hello Alex,
>>
>>         I have tried out several configurations but I still couldn't
>>         find a solution for my problem :( In the query summary (s.
>>         attachment) it looks like as if no rows are read. Do you have
>>         an idea what I have to change? I am sorry for the
>>         circumstances and thank you once more for the great support
>>         to get this working!
>>
>>
>>         Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>>>
>>>         Hi Alex,
>>>         I got the modified version working on my cluster. The query
>>>         plan looks exactly as wanted (s. attachment). This is
>>>         awesome! Unfortunately the result set is empty. As you can
>>>         see in query_state.png, the scan progress always shows 50%
>>>         although the query has finished.
>>>
>>>         The only modification in the code is the if statement you
>>>         pointed to me (I set it to true). Maybe I have to give
>>>         Impala the information about the lhs / rhs join partition
>>>         since there are no exchange nodes now (like in the following
>>>         lines)? The corresponding  partitions / blocks of each table
>>>         are on the same node.
>>>
>>>         I think we are very close to the final result and I hope you
>>>         can help me once more. Thank you so much!
>>>
>>>         Best regards
>>>         Philipp
>>>
>>>
>>>         Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>>>>         On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause
>>>>         <philippkrause.mail@googlemail.com
>>>>         <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>
>>>>             To prevent the broadcast join I could simply use the
>>>>             shuffle operator in the query:
>>>>
>>>>             SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>>>>             business_partition_2 WHERE
>>>>             business_partition_1.businessid=business_partition_2.businessid
>>>>
>>>>
>>>>         Not sure what version of Impala you are using, and whether
>>>>         hints override any changes you might make. I suggest you
>>>>         make the code work as you wish without requiring hints.
>>>>
>>>>
>>>>             I think the broadcast is currently only used because of
>>>>             my very small test tables.
>>>>
>>>>             This gives me the plan attached as
>>>>             partitioned_shuffle.png. Since my modified version
>>>>             isn't working yet, I partitioned both tables on
>>>>             businessid in Impala. The "hack" should only help to
>>>>             get into the if-condition if I partition the data
>>>>             manually, right?. But in this case (if the partitioning
>>>>             is done by Impala itself) Impala should get into the
>>>>             if-condition anyway. Unfortunately I can't see a
>>>>             difference in the plan compared to my unpartitioned
>>>>             tables (unpartitioned.png) concerning the exchange
>>>>             nodes. My goal is actually to get rid of all exchange
>>>>             nodes since the corresponding data is already present
>>>>             on each node. Actually the plan should look the same
>>>>             except for the 03 and 04 exchange then.
>>>>
>>>>
>>>>         I understand the picture and goal. I suggest you read,
>>>>         understand, and modify the code in
>>>>         DistributedPlanner.createHashJoinFragment() to create the
>>>>         plan shape that you want.
>>>>         I don't know how you are producing these plans. Are you
>>>>         sure your code changes are taking effect?
>>>>
>>>>
>>>>             Are there other changes neccessary to just take
>>>>             Table1-partition/block X and Table2-partition/block Y
>>>>             on each node and join them without any data exchange?
>>>>             Actually each node should take all its local blocks for
>>>>             both tables, join them and pass the results back to the
>>>>             coordinator where all results come together (please see
>>>>             explanation.png).
>>>>
>>>>
>>>>         No. You just need to create the plan without exchange
>>>>         nodes, as we've already gone through early in this thread.
>>>>
>>>>
>>>>             I'm looking forward hearing from you. I hope we can
>>>>             realise this.
>>>>
>>>>             Best regards
>>>>             Philipp
>>>>
>>>>
>>>>             Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>>>>             On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause
>>>>>             <philippkrause.mail@googlemail.com
>>>>>             <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>>
>>>>>                 Hi Alex,
>>>>>                 thanks for the information! I've compiled the
>>>>>                 cdh5.13.1-release and replaced
>>>>>                 impala-frontend-0.1-SNAPSHOT.jar (which seems to
>>>>>                 include the changes in the
>>>>>                 DistributedPlanner.java). There still seems to to
>>>>>                 be a method missing after replacing the jar but
>>>>>                 I'll try to figure that out.
>>>>>
>>>>>                 I have two questions concerning the code fragment
>>>>>                 in the DistributedPlanner.java you pointed to me.
>>>>>
>>>>>                 First:
>>>>>                 The attached graphic shows the query plan for two
>>>>>                 tables which are partitioned on the join attribute
>>>>>                 (standard impala version without any changes). If
>>>>>                 I change the if-condition to true for my modified
>>>>>                 version I expect to get the same result for my
>>>>>                 "hand made" partitions (outside of impala). But
>>>>>                 why is there an exchange broadcast (even in the
>>>>>                 standard version)? I mean, if I have a partition
>>>>>                 of Table 1 with ID=0 on Node X why is there a
>>>>>                 broadcast of the partition of Table 2 with ID=0 on
>>>>>                 Node Y? Actually this partition only has to be
>>>>>                 sent to Node X where the matching partition of
>>>>>                 Table 1 is located (instead of sending it to all
>>>>>                 nodes (broadcast)). Or is this exactly the case
>>>>>                 and it's only shown as a broadcast here in the
>>>>>                 graphics?
>>>>>
>>>>>
>>>>>             You are reading the plan correctly. Impala simply does
>>>>>             not implement that optimization.
>>>>>
>>>>>
>>>>>                 Second:
>>>>>                 All corresponding blocks of the partitioned tables
>>>>>                 are on the same node (e.g. Table 1 Partition with
>>>>>                 ID=0, Table 2 Partition with ID=0 => Node 1 etc.).
>>>>>                 This is what I did manually. As already mentioned
>>>>>                 before I want to join these partitions (blocks)
>>>>>                 locally on each node. But if I'm correct, the
>>>>>                 modification in the DistributedPlanner will also
>>>>>                 only lead to the plan in the attached graphic so
>>>>>                 that no further exchanges are created if I use my
>>>>>                 "hand made" partitions. But there is still the
>>>>>                 broadcast exchange which distributes the
>>>>>                 partitions across the cluster which isn't
>>>>>                 neccessary because all needed blocks are already
>>>>>                 on the same node and are ready to get joined
>>>>>                 locally. Is there a way to realise that and get
>>>>>                 rid of the broadcast exchange?
>>>>>
>>>>>             You are right. That hack in DistributedPlanner only
>>>>>             works for partitioned hash joins. You can probably
>>>>>             stitch the plan together at an earlier place, e.g.:
>>>>>             https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506
>>>>>             <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L506>
>>>>>
>>>>>                 Please correct me if I'm wrong with my assumptions.
>>>>>
>>>>>                 Thank you very much!
>>>>>
>>>>>                 Best regards
>>>>>                 Philipp
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>                 Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>>>>                 Your CM-managed cluster must be running a
>>>>>>                 "compatible" Impala version already for this
>>>>>>                 trick to work. It looks like your catalog binary
>>>>>>                 is trying to find a method which does not exist
>>>>>>                 in the .jar, presumably because your .jar is
>>>>>>                 built based on a different version of Impala
>>>>>>                 where that method does not exist anymore.
>>>>>>
>>>>>>                 It looks like you have CDH 5.13.3 installed. CDH
>>>>>>                 5.13.3 is based on Impala 2.10, see:
>>>>>>                 https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513
>>>>>>                 <https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_package_tarball_513>
>>>>>>
>>>>>>                 That means this binary copying trick will only
>>>>>>                 work with a modified version of Impala 2.10, and
>>>>>>                 very likely will not work with a different version.
>>>>>>
>>>>>>                 It's probably easier to test with the mini
>>>>>>                 cluster first. Alternatively, it "might" work if
>>>>>>                 you replace all the binaries mentioned above, but
>>>>>>                 it's quite possible that will not work.
>>>>>>
>>>>>>                 On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause
>>>>>>                 <philippkrause.mail@googlemail.com
>>>>>>                 <mailto:philippkrause.mail@googlemail.com>> wrote:
>>>>>>
>>>>>>                     Hi Alex! Thank you for the list! The build of
>>>>>>                     the modified cdh5-trunk branch (debug mode)
>>>>>>                     was sucessfull. After replacing
>>>>>>                     "impala-frontend-0.1-SNAPSHOT.jar" in
>>>>>>                     /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
>>>>>>                     I got the following error in my existing
>>>>>>                     cluster:
>>>>>>                     F0416 01:16:45.402997 17897 catalog.cc:69]
>>>>>>                     NoSuchMethodError: getCatalogObjects
>>>>>>                     When I switch back to the original jar file
>>>>>>                     the error is gone. So it must be something
>>>>>>                     wrong with this file I guess. But I wonder
>>>>>>                     about the error in catalog.cc because I
>>>>>>                     didn't touch any .cc files.
>>>>>>
>>>>>>                     I also replaced
>>>>>>                     "impala-data-source-api-1.0-SNAPSHOT.jar".
>>>>>>                     The other jar files do not exist in my impala
>>>>>>                     installation (CDH-5.13.1).
>>>>>>
>>>>>>                     What am I doing wrong?
>>>>>>
>>>>>>                     Best regards
>>>>>>                     Philipp
>>>>>>
>>>>>>
>>>>>>                     Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>>>>                     Here's the foll list. It might not be
>>>>>>>                     minimal, but copying/overwriting these
>>>>>>>                     should work.
>>>>>>>
>>>>>>>                     debug/service/impalad
>>>>>>>                     debug/service/libfesupport.so
>>>>>>>                     debug/service/libService.a
>>>>>>>                     release/service/impalad
>>>>>>>                     release/service/libfesupport.so
>>>>>>>                     release/service/libService.a
>>>>>>>                     yarn-extras-0.1-SNAPSHOT.jar
>>>>>>>                     impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>>>>>                     impala-data-source-api-1.0-SNAPSHOT.jar
>>>>>>>                     impala-frontend-0.1-SNAPSHOT-tests.jar
>>>>>>>                     impala-frontend-0.1-SNAPSHOT.jar
>>>>>>>                     libkudu_client.so.0.1.0
>>>>>>>                     libstdc++.so.6.0.20
>>>>>>>                     impala-no-sse.bc
>>>>>>>                     impala-sse.bc
>>>>>>>                     libimpalalzo.so
>>>>>>>
>>>>>>>                     If you are only modifying the Java portion
>>>>>>>                     (like DistributedPlanner), then only
>>>>>>>                     copying/replacing the *.jar files should be
>>>>>>>                     sufficient.
>>>>>>>
>>>>>>>                     On Fri, Apr 13, 2018 at 11:00 AM, Philipp
>>>>>>>                     Krause <philippkrause.mail@googlemail.com
>>>>>>>                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>                     wrote:
>>>>>>>
>>>>>>>                         Yes, I have a running (virtual) cluster.
>>>>>>>                         I would try to follow your way with the
>>>>>>>                         custom impala build
>>>>>>>                         (DistributedPlanner.java is the only
>>>>>>>                         modified file at the moment). Thank you
>>>>>>>                         in advance for the file list!
>>>>>>>
>>>>>>>                         Best regards
>>>>>>>                         Philipp
>>>>>>>
>>>>>>>                         Alexander Behm <alex.behm@cloudera.com
>>>>>>>                         <mailto:alex.behm@cloudera.com>> schrieb
>>>>>>>                         am Fr., 13. Apr. 2018, 18:45:
>>>>>>>
>>>>>>>                             I'm not really following your
>>>>>>>                             installation/setup and am not an
>>>>>>>                             expert on Cloudera Manager
>>>>>>>                             installation/config. If you are
>>>>>>>                             going to build Impala anyway, it's
>>>>>>>                             probably easiest to test on Impala's
>>>>>>>                             minicluster first.
>>>>>>>
>>>>>>>                             In general, if you have a running
>>>>>>>                             Cloudera Managed cluster, you can
>>>>>>>                             deploy a custom Impala build by
>>>>>>>                             simply overwriting the Impala
>>>>>>>                             existing binaries and jars with the
>>>>>>>                             new build. If you want to go this
>>>>>>>                             route, I can give you a full list of
>>>>>>>                             files you need to replace.
>>>>>>>
>>>>>>>                             On Tue, Apr 10, 2018 at 11:44 AM,
>>>>>>>                             Philipp Krause
>>>>>>>                             <philippkrause.mail@googlemail.com
>>>>>>>                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>                             wrote:
>>>>>>>
>>>>>>>                                 Thank you for the explanation!
>>>>>>>                                 Yes, I'm using HDFS. The single
>>>>>>>                                 replica setup is only for test
>>>>>>>                                 purposes at the moment. I think
>>>>>>>                                 this makes it easier to gain
>>>>>>>                                 some first results since less
>>>>>>>                                 modifications (scheduler etc.)
>>>>>>>                                 are neccessary.
>>>>>>>                                 I would like to test the
>>>>>>>                                 DistributedPlanner modification
>>>>>>>                                 in my virtual cluster. I used a
>>>>>>>                                 customized Vagrant script to
>>>>>>>                                 install Impala on multiple hosts
>>>>>>>                                 (s.attachment). It simply
>>>>>>>                                 installs
>>>>>>>                                 cloudera-manager-server-db,
>>>>>>>                                 cloudera-manager-server and
>>>>>>>                                 cloudera-manager-daemons via
>>>>>>>                                 apt-get. What would be the
>>>>>>>                                 simplest solution to setup my
>>>>>>>                                 modified version? Could I simply
>>>>>>>                                 call ./buildall.sh and change
>>>>>>>                                 the script to sth. like this?
>>>>>>>
>>>>>>>                                 echo "Install java..."
>>>>>>>                                 apt-get -q -y --force-yes
>>>>>>>                                 install oracle-j2sdk1.7
>>>>>>>                                 echo "Download impala..."
>>>>>>>                                 wget https://... where I
>>>>>>>                                 uploaded my modified version
>>>>>>>                                 echo "Extract impala..."
>>>>>>>                                 tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>>>>                                 cd Impala-cdh5-trunk
>>>>>>>                                 echo "Build impala..."
>>>>>>>                                 ./buildall.sh
>>>>>>>                                 echo "Start impala instances..."
>>>>>>>                                 service cloudera-scm-server-db
>>>>>>>                                 initdb
>>>>>>>                                 service cloudera-scm-server-db start
>>>>>>>                                 service cloudera-scm-server start
>>>>>>>
>>>>>>>                                 Or is there another, maybe even
>>>>>>>                                 easier method, to test the code?
>>>>>>>                                 Maybe via
>>>>>>>                                 bootstrap_development.sh /
>>>>>>>                                 minicluster?
>>>>>>>
>>>>>>>                                 Best regards
>>>>>>>                                 Philipp
>>>>>>>
>>>>>>>                                 2018-04-05 18:39 GMT+02:00
>>>>>>>                                 Alexander Behm
>>>>>>>                                 <alex.behm@cloudera.com
>>>>>>>                                 <mailto:alex.behm@cloudera.com>>:
>>>>>>>
>>>>>>>                                     Apologies for the late
>>>>>>>                                     response. Btw, your previous
>>>>>>>                                     post was clear enough to me,
>>>>>>>                                     so no worries :)
>>>>>>>
>>>>>>>
>>>>>>>                                     On Wed, Apr 4, 2018 at 7:46
>>>>>>>                                     AM, Philipp Krause
>>>>>>>                                     <philippkrause.mail@googlemail.com
>>>>>>>                                     <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>                                     wrote:
>>>>>>>
>>>>>>>                                         Hello Alex,
>>>>>>>
>>>>>>>                                         I think my previous post
>>>>>>>                                         has been too long and
>>>>>>>                                         confusing. I apologize
>>>>>>>                                         for that!
>>>>>>>
>>>>>>>                                         If replicas are
>>>>>>>                                         completely deactivated,
>>>>>>>                                         all scan ranges of a
>>>>>>>                                         block are mapped to the
>>>>>>>                                         one host, where the
>>>>>>>                                         block is located on.
>>>>>>>                                         This host is the
>>>>>>>                                         "executor"/reader for
>>>>>>>                                         all the scan ranges of
>>>>>>>                                         this block. Is that correct?
>>>>>>>
>>>>>>>
>>>>>>>                                     Yes, assuming you are using
>>>>>>>                                     HDFS.
>>>>>>>
>>>>>>>
>>>>>>>                                         I tried to visualize my
>>>>>>>                                         understanding of the
>>>>>>>                                         scan_range to host
>>>>>>>                                         mapping for my use case
>>>>>>>                                         (s. attachment). Could
>>>>>>>                                         you please have a quick
>>>>>>>                                         look at it and tell me
>>>>>>>                                         if this is correct?
>>>>>>>
>>>>>>>                                         "The existing scan range
>>>>>>>                                         assignment is scan-node
>>>>>>>                                         centric. For each scan
>>>>>>>                                         node, we independently
>>>>>>>                                         decide which of its scan
>>>>>>>                                         ranges should be
>>>>>>>                                         processed by which host."
>>>>>>>                                         Without replicas, all
>>>>>>>                                         scan ranges of a block
>>>>>>>                                         would be assigend to the
>>>>>>>                                         same host where this
>>>>>>>                                         block is located on.
>>>>>>>                                         Isn't everything local
>>>>>>>                                         here, so that Table_A -
>>>>>>>                                         Block_0 and Table_B -
>>>>>>>                                         Block_0 can be joined
>>>>>>>                                         local or are further
>>>>>>>                                         steps neccessary? The
>>>>>>>                                         condition in the
>>>>>>>                                         DistributedPlanner you
>>>>>>>                                         pointed to me is set to
>>>>>>>                                         false (no exchange nodes).
>>>>>>>
>>>>>>>                                         "You want it to be
>>>>>>>                                         host-centric. For each
>>>>>>>                                         host, collect the local
>>>>>>>                                         scan ranges of *all*
>>>>>>>                                         scan nodes, and assign
>>>>>>>                                         them to that host."
>>>>>>>                                         Wouldn't the standard
>>>>>>>                                         setup from above work?
>>>>>>>                                         Wouldn't I assign all
>>>>>>>                                         (the same) scan ranges
>>>>>>>                                         to each host in this
>>>>>>>                                         case here?
>>>>>>>
>>>>>>>
>>>>>>>                                     The standard setup works
>>>>>>>                                     only in if every block only
>>>>>>>                                     has exactly one replica. For
>>>>>>>                                     our purposes, that is
>>>>>>>                                     basically never the case
>>>>>>>                                     (who would store production
>>>>>>>                                     data without replication?),
>>>>>>>                                     so the single-replica
>>>>>>>                                     assumption was not clear to me.
>>>>>>>
>>>>>>>                                     Does your current setup
>>>>>>>                                     (only changing the planner
>>>>>>>                                     and not the scheduler)
>>>>>>>                                     produce the expected results?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>                                         Thank you very much!
>>>>>>>
>>>>>>>                                         Best regards
>>>>>>>                                         Philipp
>>>>>>>
>>>>>>>                                         2018-03-28 21:04
>>>>>>>                                         GMT+02:00 Philipp Krause
>>>>>>>                                         <philippkrause.mail@googlemail.com
>>>>>>>                                         <mailto:philippkrause.mail@googlemail.com>>:
>>>>>>>
>>>>>>>                                             Thank you for your
>>>>>>>                                             answer and sorry for
>>>>>>>                                             my delay!
>>>>>>>
>>>>>>>                                             If my understanding
>>>>>>>                                             is correct, the list
>>>>>>>                                             of scan nodes
>>>>>>>                                             consists of all
>>>>>>>                                             nodes which contain
>>>>>>>                                             a *local* block from
>>>>>>>                                             a table that is
>>>>>>>                                             needed for the query
>>>>>>>                                             (Assumption: I have
>>>>>>>                                             no replicas in my
>>>>>>>                                             first tests). If
>>>>>>>                                             TableA-Block0 is on
>>>>>>>                                             Node_0, isn't Node_0
>>>>>>>                                             automatically a scan
>>>>>>>                                             node? And wouldn't
>>>>>>>                                             this scan node
>>>>>>>                                             always be the host
>>>>>>>                                             for the complete
>>>>>>>                                             scan range(s) then?
>>>>>>>
>>>>>>>                                             "For each scan node,
>>>>>>>                                             we independently
>>>>>>>                                             decide which of its
>>>>>>>                                             scan ranges should
>>>>>>>                                             be processed by
>>>>>>>                                             which host."
>>>>>>>
>>>>>>>                                             https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532
>>>>>>>                                             <https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/scheduling/scheduler.cc#L532>
>>>>>>>
>>>>>>>                                             // Loop over all
>>>>>>>                                             scan ranges, select
>>>>>>>                                             an executor for
>>>>>>>                                             those with local
>>>>>>>                                             impalads and
>>>>>>>                                             // collect all
>>>>>>>                                             others for later
>>>>>>>                                             processing.
>>>>>>>
>>>>>>>                                             So in this whole
>>>>>>>                                             block, scan ranges
>>>>>>>                                             are assigned to the
>>>>>>>                                             closest executor
>>>>>>>                                             (=host?). But isn't
>>>>>>>                                             the closest executor
>>>>>>>                                             always the node the
>>>>>>>                                             block is located on
>>>>>>>                                             (assumed impalad is
>>>>>>>                                             installed and I have
>>>>>>>                                             no replicas)? And
>>>>>>>                                             isn't this node
>>>>>>>                                             always a scan node
>>>>>>>                                             at the same time?
>>>>>>>                                             Otherwise a thread
>>>>>>>                                             on a remote host had
>>>>>>>                                             to read the
>>>>>>>                                             corresponding scan
>>>>>>>                                             range, which would
>>>>>>>                                             be more expensive.
>>>>>>>                                             The only exception I
>>>>>>>                                             can think of is when
>>>>>>>                                             all threads on the
>>>>>>>                                             local node are busy.
>>>>>>>                                             Or, if I use
>>>>>>>                                             replicas and all
>>>>>>>                                             other threads of my
>>>>>>>                                             node with the
>>>>>>>                                             "original" block are
>>>>>>>                                             busy, a thread on
>>>>>>>                                             another node which
>>>>>>>                                             contains a replica
>>>>>>>                                             could read a special
>>>>>>>                                             scan range of its
>>>>>>>                                             local block. Is my
>>>>>>>                                             understanding
>>>>>>>                                             correct here?
>>>>>>>
>>>>>>>                                             Aren't all scan
>>>>>>>                                             ranges read locally
>>>>>>>                                             by its scan nodes if
>>>>>>>                                             I have impalad
>>>>>>>                                             installed on all
>>>>>>>                                             nodes? And am I
>>>>>>>                                             right, that the scan
>>>>>>>                                             range is only based
>>>>>>>                                             on its length which
>>>>>>>                                             refers to
>>>>>>>                                             maxScanRangeLength
>>>>>>>                                             in
>>>>>>>                                             computeScanRangeLocations?
>>>>>>>                                             https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>>>>>                                             <https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java#L721>
>>>>>>>
>>>>>>>                                             I hope you can help
>>>>>>>                                             me with the scan
>>>>>>>                                             node <-> scan
>>>>>>>                                             range->host
>>>>>>>                                             relationship. If I
>>>>>>>                                             have Table_A-Block_0
>>>>>>>                                             and Table_B_Block_0
>>>>>>>                                             on the same node
>>>>>>>                                             (which I want to
>>>>>>>                                             join locally), I
>>>>>>>                                             don't get the point
>>>>>>>                                             of why scan ranges
>>>>>>>                                             could be assigned to
>>>>>>>                                             another host in my
>>>>>>>                                             scenario.
>>>>>>>
>>>>>>>                                             Best regads and
>>>>>>>                                             thank you very much!
>>>>>>>                                             Philipp Krause
>>>>>>>
>>>>>>>
>>>>>>>                                             Am 21.03.2018 um
>>>>>>>                                             05:21 schrieb
>>>>>>>                                             Alexander Behm:
>>>>>>>>                                             Thanks for
>>>>>>>>                                             following up. I
>>>>>>>>                                             think I understand
>>>>>>>>                                             your setup.
>>>>>>>>
>>>>>>>>                                             If you want to not
>>>>>>>>                                             think about scan
>>>>>>>>                                             ranges, then you
>>>>>>>>                                             can modify
>>>>>>>>                                             HdfsScanNode.computeScanRangeLocations().
>>>>>>>>                                             For example, you
>>>>>>>>                                             could change it to
>>>>>>>>                                             produce one scan
>>>>>>>>                                             range per file or
>>>>>>>>                                             per HDFS block.
>>>>>>>>                                             That way you'd know
>>>>>>>>                                             exactly what a scan
>>>>>>>>                                             range corresponds to.
>>>>>>>>
>>>>>>>>                                             I think the
>>>>>>>>                                             easiest/fastest way
>>>>>>>>                                             for you to make
>>>>>>>>                                             progress is to
>>>>>>>>                                             re-implement the
>>>>>>>>                                             existing scan range
>>>>>>>>                                             assignment logic in
>>>>>>>>                                             that place in the
>>>>>>>>                                             code I had pointed
>>>>>>>>                                             you to. There is no
>>>>>>>>                                             quick fix to change
>>>>>>>>                                             the existing behavior.
>>>>>>>>                                             The existing scan
>>>>>>>>                                             range assignment is
>>>>>>>>                                             scan-node centric.
>>>>>>>>                                             For each scan node,
>>>>>>>>                                             we independently
>>>>>>>>                                             decide which of its
>>>>>>>>                                             scan ranges should
>>>>>>>>                                             be processed by
>>>>>>>>                                             which host.
>>>>>>>>
>>>>>>>>                                             I believe an
>>>>>>>>                                             algorithm to
>>>>>>>>                                             achieve your goal
>>>>>>>>                                             would look
>>>>>>>>                                             completely
>>>>>>>>                                             different. You want
>>>>>>>>                                             it to be
>>>>>>>>                                             host-centric. For
>>>>>>>>                                             each host, collect
>>>>>>>>                                             the local scan
>>>>>>>>                                             ranges of *all*
>>>>>>>>                                             scan nodes, and
>>>>>>>>                                             assign them to that
>>>>>>>>                                             host.
>>>>>>>>
>>>>>>>>                                             Does that make sense?
>>>>>>>>
>>>>>>>>                                             Alex
>>>>>>>>
>>>>>>>>
>>>>>>>>                                             On Mon, Mar 19,
>>>>>>>>                                             2018 at 1:02 PM,
>>>>>>>>                                             Philipp Krause
>>>>>>>>                                             <philippkrause.mail@googlemail.com
>>>>>>>>                                             <mailto:philippkrause.mail@googlemail.com>>
>>>>>>>>                                             wrote:
>>>>>>>>
>>>>>>>>                                                 I'd like to
>>>>>>>>                                                 provide a small
>>>>>>>>                                                 example for our
>>>>>>>>                                                 purpose. The
>>>>>>>>                                                 last post may
>>>>>>>>                                                 be a bit
>>>>>>>>                                                 confusing, so
>>>>>>>>                                                 here's a very
>>>>>>>>                                                 simple example
>>>>>>>>                                                 in the attached
>>>>>>>>                                                 pdf file. I
>>>>>>>>                                                 hope, it's
>>>>>>>>                                                 understandable.
>>>>>>>>                                                 Otherwise,
>>>>>>>>                                                 please give me
>>>>>>>>                                                 a short feedback.
>>>>>>>>
>>>>>>>>                                                 Basically, I
>>>>>>>>                                                 only want each
>>>>>>>>                                                 data node to
>>>>>>>>                                                 join all it's
>>>>>>>>                                                 local blocks.
>>>>>>>>                                                 Is there a
>>>>>>>>                                                 range mapping
>>>>>>>>                                                 needed or is it
>>>>>>>>                                                 possible to
>>>>>>>>                                                 easily join all
>>>>>>>>                                                 local blocks
>>>>>>>>                                                 (regardless of
>>>>>>>>                                                 its content)
>>>>>>>>                                                 since
>>>>>>>>                                                 everything is
>>>>>>>>                                                 already
>>>>>>>>                                                 "prepared"?
>>>>>>>>                                                 Maybe you can
>>>>>>>>                                                 clarify this
>>>>>>>>                                                 for me.
>>>>>>>>
>>>>>>>>                                                 As you can see
>>>>>>>>                                                 in the example,
>>>>>>>>                                                 the tables are
>>>>>>>>                                                 not partitioned
>>>>>>>>                                                 by ID. The
>>>>>>>>                                                 files are
>>>>>>>>                                                 manually
>>>>>>>>                                                 prepared by the
>>>>>>>>                                                 help of the
>>>>>>>>                                                 modulo
>>>>>>>>                                                 function. So I
>>>>>>>>                                                 don't have a
>>>>>>>>                                                 range like
>>>>>>>>                                                 [0,10], but
>>>>>>>>                                                 something like
>>>>>>>>                                                 0,5,10,15 etc.
>>>>>>>>
>>>>>>>>                                                 I hope, I
>>>>>>>>                                                 didn't make it
>>>>>>>>                                                 too complicated
>>>>>>>>                                                 and confusing.
>>>>>>>>                                                 I think, the
>>>>>>>>                                                 actual idea
>>>>>>>>                                                 behind this is
>>>>>>>>                                                 really simple
>>>>>>>>                                                 and I hope you
>>>>>>>>                                                 can help me to
>>>>>>>>                                                 get this working.
>>>>>>>>
>>>>>>>>                                                 Best regards
>>>>>>>>                                                 and thank you
>>>>>>>>                                                 very much for
>>>>>>>>                                                 your time!
>>>>>>>>                                                 Philipp
>>>>>>>>
>>>>>>>>
>>>>>>>>                                                 Am 18.03.2018
>>>>>>>>                                                 um 17:32
>>>>>>>>                                                 schrieb Philipp
>>>>>>>>                                                 Krause:
>>>>>>>>>
>>>>>>>>>                                                 Hi! At the
>>>>>>>>>                                                 moment the
>>>>>>>>>                                                 data to
>>>>>>>>>                                                 parquet
>>>>>>>>>                                                 (block)
>>>>>>>>>                                                 mapping is
>>>>>>>>>                                                 based on a
>>>>>>>>>                                                 simple modulo
>>>>>>>>>                                                 function: Id %
>>>>>>>>>                                                 #data_nodes.
>>>>>>>>>                                                 So with 5 data
>>>>>>>>>                                                 nodes all rows
>>>>>>>>>                                                 with Id's
>>>>>>>>>                                                 0,5,10,... are
>>>>>>>>>                                                 written to
>>>>>>>>>                                                 Parquet_0,
>>>>>>>>>                                                 Id's 1,4,9 are
>>>>>>>>>                                                 written to
>>>>>>>>>                                                 Parquet_1 etc.
>>>>>>>>>                                                 That's what I
>>>>>>>>>                                                 did manually.
>>>>>>>>>                                                 Since the
>>>>>>>>>                                                 parquet file
>>>>>>>>>                                                 size and the
>>>>>>>>>                                                 block size are
>>>>>>>>>                                                 both set to
>>>>>>>>>                                                 64MB, each
>>>>>>>>>                                                 parquet file
>>>>>>>>>                                                 will result in
>>>>>>>>>                                                 one block when
>>>>>>>>>                                                 I transfer the
>>>>>>>>>                                                 parquet files
>>>>>>>>>                                                 to HDFS. By
>>>>>>>>>                                                 default, HDFS
>>>>>>>>>                                                 distributes
>>>>>>>>>                                                 the blocks
>>>>>>>>>                                                 randomly. For
>>>>>>>>>                                                 test purposes
>>>>>>>>>                                                 I transferred
>>>>>>>>>                                                 corresponding
>>>>>>>>>                                                 blocks from
>>>>>>>>>                                                 Table_A and
>>>>>>>>>                                                 Table_B to the
>>>>>>>>>                                                 same data node
>>>>>>>>>                                                 (Table_A -
>>>>>>>>>                                                 Block_X with
>>>>>>>>>                                                 Id's 0,5,10
>>>>>>>>>                                                 and Table_B -
>>>>>>>>>                                                 Block_Y with
>>>>>>>>>                                                 Id's 0,5,10).
>>>>>>>>>                                                 In this case,
>>>>>>>>>                                                 they are
>>>>>>>>>                                                 transferred to
>>>>>>>>>                                                 data_node_0
>>>>>>>>>                                                 because the
>>>>>>>>>                                                 modulo
>>>>>>>>>                                                 function
>>>>>>>>>                                                 (which I want
>>>>>>>>>                                                 to implement
>>>>>>>>>                                                 in the
>>>>>>>>>                                                 scheduler)
>>>>>>>>>                                                 returns 0 for
>>>>>>>>>                                                 these Id's.
>>>>>>>>>                                                 This is also
>>>>>>>>>                                                 done manually
>>>>>>>>>                                                 at the moment.
>>>>>>>>>
>>>>>>>>>                                                 1.)
>>>>>>>>>                                                 DistributedPlanner:
>>>>>>>>>                                                 For first,
>>>>>>>>>                                                 upcoming tests
>>>>>>>>>                                                 I simply
>>>>>>>>>                                                 changed the
>>>>>>>>>                                                 first
>>>>>>>>>                                                 condition in
>>>>>>>>>                                                 the
>>>>>>>>>                                                 DistributedPlanner
>>>>>>>>>                                                 to true to
>>>>>>>>>                                                 avoid exchange
>>>>>>>>>                                                 nodes.
>>>>>>>>>
>>>>>>>>>                                                 2.) The
>>>>>>>>>                                                 scheduler:
>>>>>>>>>                                                 That's the
>>>>>>>>>                                                 part I'm
>>>>>>>>>                                                 currently
>>>>>>>>>                                                 struggling
>>>>>>>>>                                                 with. For
>>>>>>>>>                                                 first tests,
>>>>>>>>>                                                 block
>>>>>>>>>                                                 replication is
>>>>>>>>>                                                 deactivated.
>>>>>>>>>                                                 I'm not sure
>>>>>>>>>                                                 how / where to
>>>>>>>>>                                                 implement the
>>>>>>>>>                                                 modulo
>>>>>>>>>                                                 function for
>>>>>>>>>                                                 scan range to
>>>>>>>>>                                                 host mapping.
>>>>>>>>>                                                 Without the
>>>>>>>>>                                                 modulo
>>>>>>>>>                                                 function, I
>>>>>>>>>                                                 had to
>>>>>>>>>                                                 implement a
>>>>>>>>>                                                 hard coded
>>>>>>>>>                                                 mapping
>>>>>>>>>                                                 (something
>>>>>>>>>                                                 like "range"
>>>>>>>>>                                                 0-0, 5-5,
>>>>>>>>>                                                 10-10 ->
>>>>>>>>>                                                 Data_node_0
>>>>>>>>>                                                 etc.). Is that
>>>>>>>>>                                                 correct?
>>>>>>>>>                                                 Instead I
>>>>>>>>>                                                 would like to
>>>>>>>>>                                                 use a slightly
>>>>>>>>>                                                 more flexible
>>>>>>>>>                                                 solution by
>>>>>>>>>                                                 the help of
>>>>>>>>>                                                 this modulo
>>>>>>>>>                                                 function for
>>>>>>>>>                                                 the host mapping.
>>>>>>>>>
>>>>>>>>>                                                 I would be
>>>>>>>>>                                                 really
>>>>>>>>>                                                 grateful if
>>>>>>>>>                                                 you could give
>>>>>>>>>                                                 me a hint for
>>>>>>>>>                                                 the scheduling
>>>>>>>>>                                                 implementation.
>>>>>>>>>                                                 I try to go
>>>>>>>>>                                                 deeper through
>>>>>>>>>                                                 the code
>>>>>>>>>                                                 meanwhile.
>>>>>>>>>
>>>>>>>>>                                                 Best regards
>>>>>>>>>                                                 and thank you
>>>>>>>>>                                                 in advance
>>>>>>>>>                                                 Philipp
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                                                 Am 14.03.2018
>>>>>>>>>                                                 um 08:06
>>>>>>>>>                                                 schrieb
>>>>>>>>>                                                 Philipp Krause:
>>>>>>>>>>                                                 Thank you
>>>>>>>>>>                                                 very much for
>>>>>>>>>>                                                 these
>>>>>>>>>>                                                 information!
>>>>>>>>>>                                                 I'll try to
>>>>>>>>>>                                                 implement
>>>>>>>>>>                                                 these two
>>>>>>>>>>                                                 steps and
>>>>>>>>>>                                                 post some
>>>>>>>>>>                                                 updates
>>>>>>>>>>                                                 within the
>>>>>>>>>>                                                 next days!
>>>>>>>>>>
>>>>>>>>>>                                                 Best regards
>>>>>>>>>>                                                 Philipp
>>>>>>>>>>
>>>>>>>>>>                                                 2018-03-13
>>>>>>>>>>                                                 5:38
>>>>>>>>>>                                                 GMT+01:00
>>>>>>>>>>                                                 Alexander
>>>>>>>>>>                                                 Behm
>>>>>>>>>>                                                 <alex.behm@cloudera.com
>>>>>>>>>>                                                 <mailto:alex.behm@cloudera.com>>:
>>>>>>>>>>
>>>>>>>>>>                                                     Cool that
>>>>>>>>>>                                                     you
>>>>>>>>>>                                                     working
>>>>>>>>>>                                                     on a
>>>>>>>>>>                                                     research
>>>>>>>>>>                                                     project
>>>>>>>>>>                                                     with Impala!
>>>>>>>>>>
>>>>>>>>>>                                                     Properly
>>>>>>>>>>                                                     adding
>>>>>>>>>>                                                     such a
>>>>>>>>>>                                                     feature
>>>>>>>>>>                                                     to Impala
>>>>>>>>>>                                                     is a
>>>>>>>>>>                                                     substantial
>>>>>>>>>>                                                     effort,
>>>>>>>>>>                                                     but
>>>>>>>>>>                                                     hacking
>>>>>>>>>>                                                     the code
>>>>>>>>>>                                                     for an
>>>>>>>>>>                                                     experiment
>>>>>>>>>>                                                     or two
>>>>>>>>>>                                                     seems doable.
>>>>>>>>>>
>>>>>>>>>>                                                     I think
>>>>>>>>>>                                                     you will
>>>>>>>>>>                                                     need to
>>>>>>>>>>                                                     modify
>>>>>>>>>>                                                     two
>>>>>>>>>>                                                     things:
>>>>>>>>>>                                                     (1) the
>>>>>>>>>>                                                     planner
>>>>>>>>>>                                                     to not
>>>>>>>>>>                                                     add
>>>>>>>>>>                                                     exchange
>>>>>>>>>>                                                     nodes,
>>>>>>>>>>                                                     and (2)
>>>>>>>>>>                                                     the
>>>>>>>>>>                                                     scheduler
>>>>>>>>>>                                                     to assign
>>>>>>>>>>                                                     the
>>>>>>>>>>                                                     co-located
>>>>>>>>>>                                                     scan
>>>>>>>>>>                                                     ranges to
>>>>>>>>>>                                                     the same
>>>>>>>>>>                                                     host.
>>>>>>>>>>
>>>>>>>>>>                                                     Here are
>>>>>>>>>>                                                     a few
>>>>>>>>>>                                                     starting
>>>>>>>>>>                                                     points in
>>>>>>>>>>                                                     the code:
>>>>>>>>>>
>>>>>>>>>>                                                     1)
>>>>>>>>>>                                                     DistributedPlanner
>>>>>>>>>>                                                     https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>>>>>                                                     <https://github.com/apache/impala/blob/master/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java#L318>
>>>>>>>>>>
>>>>>>>>>>                                                     The first
>>>>>>>>>>                                                     condition
>>>>>>>>>>                                                     handles
>>>>>>>>>>                                                     the case
>>>>>>>>>>                                                     where no
>>>>>>>>>>                                                     exchange
>>>>>>>>>>                                                     nodes
>>>>>>>>>>                                                     need to
>>>>>>>>>>                                                     be added
>>>>>>>>>>                                                     because
>>>>>>>>>>                                                     the join
>>>>>>>>>>                                                     inputs
>>>>>>>>>>                                                     are
>>>>>>>>>>                                                     already
>>>>>>>>>>                                                     suitably
>>>>>>>>>>                                                     partitioned.
>>>>>>>>>>                                                     You could
>>>>>>>>>>                                                     hack the
>>>>>>>>>>                                                     code to
>>>>>>>>>>                                                     always go
>>>>>>>>>>                                                     into that
>>>>>>>>>>                                                     codepath,
>>>>>>>>>>                                                     so no
>>>>>>>>>>                                                     exchanges
>>>>>>>>>>                                                     are added.
>>>>>>>>>>
>>>>>>>>>>                                                     2) The
>>>>>>>>>>                                                     scheduler
>>>>>>>>>>                                                     https://github.com/apa
>>>>>>>>>>                                                     <https://github.com/apache/impala/blob/master/be/src/scheduling/scheduler.cc#L226>
>>>>>>>>>>
>>     ...
>>
>>     [Message clipped] 
>>
>>
>


Mime
View raw message