impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Behm <alex.b...@cloudera.com>
Subject Re: Local join instead of data exchange - co-located blocks
Date Fri, 04 May 2018 03:54:09 GMT
No idea what's going on, but my guess is something is awry with the
scan-range assignment. Can you attach the full profile? It's probably also
good to print the scan ranges created in
HdfsScanNode.computeScanRangeLocations().

On Thu, May 3, 2018 at 5:51 PM, Philipp Krause <
philippkrause.mail@googlemail.com> wrote:

> Hello Alex,
>
> I have tried out several configurations but I still couldn't find a
> solution for my problem :( In the query summary (s. attachment) it looks
> like as if no rows are read. Do you have an idea what I have to change? I
> am sorry for the circumstances and thank you once more for the great
> support to get this working!
>
> Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>
> Hi Alex,
> I got the modified version working on my cluster. The query plan looks
> exactly as wanted (s. attachment). This is awesome! Unfortunately the
> result set is empty. As you can see in query_state.png, the scan progress
> always shows 50% although the query has finished.
>
> The only modification in the code is the if statement you pointed to me (I
> set it to true). Maybe I have to give Impala the information about the lhs
> / rhs join partition since there are no exchange nodes now (like in the
> following lines)? The corresponding  partitions / blocks of each table are
> on the same node.
>
> I think we are very close to the final result and I hope you can help me
> once more. Thank you so much!
>
> Best regards
> Philipp
>
> Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>
> On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause <philippkrause.mail@
> googlemail.com> wrote:
>
>> To prevent the broadcast join I could simply use the shuffle operator in
>> the query:
>>
>> SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>> business_partition_2 WHERE business_partition_1.businessi
>> d=business_partition_2.businessid
>>
>
> Not sure what version of Impala you are using, and whether hints override
> any changes you might make. I suggest you make the code work as you wish
> without requiring hints.
>
>>
>> I think the broadcast is currently only used because of my very small
>> test tables.
>>
>> This gives me the plan attached as partitioned_shuffle.png. Since my
>> modified version isn't working yet, I partitioned both tables on businessid
>> in Impala. The "hack" should only help to get into the if-condition if I
>> partition the data manually, right?. But in this case (if the partitioning
>> is done by Impala itself) Impala should get into the if-condition anyway.
>> Unfortunately I can't see a difference in the plan compared to my
>> unpartitioned tables (unpartitioned.png) concerning the exchange nodes. My
>> goal is actually to get rid of all exchange nodes since the corresponding
>> data is already present on each node. Actually the plan should look the
>> same except for the 03 and 04 exchange then.
>>
>
> I understand the picture and goal. I suggest you read, understand, and
> modify the code in DistributedPlanner.createHashJoinFragment() to create
> the plan shape that you want.
> I don't know how you are producing these plans. Are you sure your code
> changes are taking effect?
>
>>
>> Are there other changes neccessary to just take Table1-partition/block X
>> and Table2-partition/block Y on each node and join them without any data
>> exchange? Actually each node should take all its local blocks for both
>> tables, join them and pass the results back to the coordinator where all
>> results come together (please see explanation.png).
>>
>
> No. You just need to create the plan without exchange nodes, as we've
> already gone through early in this thread.
>
>>
>> I'm looking forward hearing from you. I hope we can realise this.
>>
>> Best regards
>> Philipp
>>
>> Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>
>> On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause <
>> philippkrause.mail@googlemail.com> wrote:
>>
>>> Hi Alex,
>>> thanks for the information! I've compiled the cdh5.13.1-release and
>>> replaced impala-frontend-0.1-SNAPSHOT.jar (which seems to include the
>>> changes in the DistributedPlanner.java). There still seems to to be a
>>> method missing after replacing the jar but I'll try to figure that out.
>>>
>>> I have two questions concerning the code fragment in the
>>> DistributedPlanner.java you pointed to me.
>>>
>>> First:
>>> The attached graphic shows the query plan for two tables which are
>>> partitioned on the join attribute (standard impala version without any
>>> changes). If I change the if-condition to true for my modified version I
>>> expect to get the same result for my "hand made" partitions (outside of
>>> impala). But why is there an exchange broadcast (even in the standard
>>> version)? I mean, if I have a partition of Table 1 with ID=0 on Node X why
>>> is there a broadcast of the partition of Table 2 with ID=0 on Node Y?
>>> Actually this partition only has to be sent to Node X where the matching
>>> partition of Table 1 is located (instead of sending it to all nodes
>>> (broadcast)). Or is this exactly the case and it's only shown as a
>>> broadcast here in the graphics?
>>>
>>
>> You are reading the plan correctly. Impala simply does not implement that
>> optimization.
>>
>>>
>>> Second:
>>> All corresponding blocks of the partitioned tables are on the same node
>>> (e.g. Table 1 Partition with ID=0, Table 2 Partition with ID=0 => Node 1
>>> etc.). This is what I did manually. As already mentioned before I want to
>>> join these partitions (blocks) locally on each node. But if I'm correct,
>>> the modification in the DistributedPlanner will also only lead to the plan
>>> in the attached graphic so that no further exchanges are created if I use
>>> my "hand made" partitions. But there is still the broadcast exchange which
>>> distributes the partitions across the cluster which isn't neccessary
>>> because all needed blocks are already on the same node and are ready to get
>>> joined locally. Is there a way to realise that and get rid of the broadcast
>>> exchange?
>>>
>>> You are right. That hack in DistributedPlanner only works for
>> partitioned hash joins. You can probably stitch the plan together at an
>> earlier place, e.g.:
>> https://github.com/apache/impala/blob/master/fe/src/main/
>> java/org/apache/impala/planner/DistributedPlanner.java#L506
>>
>>
>>> Please correct me if I'm wrong with my assumptions.
>>>
>>> Thank you very much!
>>>
>>> Best regards
>>> Philipp
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>
>>> Your CM-managed cluster must be running a "compatible" Impala version
>>> already for this trick to work. It looks like your catalog binary is trying
>>> to find a method which does not exist in the .jar, presumably because your
>>> .jar is built based on a different version of Impala where that method does
>>> not exist anymore.
>>>
>>> It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is based on
>>> Impala 2.10, see:
>>> https://www.cloudera.com/documentation/enterprise/release-no
>>> tes/topics/cdh_vd_cdh_package_tarball_513.html#cm_vd_cdh_pac
>>> kage_tarball_513
>>>
>>> That means this binary copying trick will only work with a modified
>>> version of Impala 2.10, and very likely will not work with a different
>>> version.
>>>
>>> It's probably easier to test with the mini cluster first. Alternatively,
>>> it "might" work if you replace all the binaries mentioned above, but it's
>>> quite possible that will not work.
>>>
>>> On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause <
>>> philippkrause.mail@googlemail.com> wrote:
>>>
>>>> Hi Alex! Thank you for the list! The build of the modified cdh5-trunk
>>>> branch (debug mode) was sucessfull. After replacing
>>>> "impala-frontend-0.1-SNAPSHOT.jar" in /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
>>>> I got the following error in my existing cluster:
>>>> F0416 01:16:45.402997 17897 catalog.cc:69] NoSuchMethodError:
>>>> getCatalogObjects
>>>> When I switch back to the original jar file the error is gone. So it
>>>> must be something wrong with this file I guess. But I wonder about the
>>>> error in catalog.cc because I didn't touch any .cc files.
>>>>
>>>> I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar". The other
>>>> jar files do not exist in my impala installation (CDH-5.13.1).
>>>>
>>>> What am I doing wrong?
>>>>
>>>> Best regards
>>>> Philipp
>>>>
>>>> Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>
>>>> Here's the foll list. It might not be minimal, but copying/overwriting
>>>> these should work.
>>>>
>>>> debug/service/impalad
>>>> debug/service/libfesupport.so
>>>> debug/service/libService.a
>>>> release/service/impalad
>>>> release/service/libfesupport.so
>>>> release/service/libService.a
>>>> yarn-extras-0.1-SNAPSHOT.jar
>>>> impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>> impala-data-source-api-1.0-SNAPSHOT.jar
>>>> impala-frontend-0.1-SNAPSHOT-tests.jar
>>>> impala-frontend-0.1-SNAPSHOT.jar
>>>> libkudu_client.so.0.1.0
>>>> libstdc++.so.6.0.20
>>>> impala-no-sse.bc
>>>> impala-sse.bc
>>>> libimpalalzo.so
>>>>
>>>> If you are only modifying the Java portion (like DistributedPlanner),
>>>> then only copying/replacing the *.jar files should be sufficient.
>>>>
>>>> On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
>>>> philippkrause.mail@googlemail.com> wrote:
>>>>
>>>>> Yes, I have a running (virtual) cluster. I would try to follow your
>>>>> way with the custom impala build (DistributedPlanner.java is the only
>>>>> modified file at the moment). Thank you in advance for the file list!
>>>>>
>>>>> Best regards
>>>>> Philipp
>>>>>
>>>>> Alexander Behm <alex.behm@cloudera.com> schrieb am Fr., 13. Apr.
>>>>> 2018, 18:45:
>>>>>
>>>>>> I'm not really following your installation/setup and am not an expert
>>>>>> on Cloudera Manager installation/config. If you are going to build
Impala
>>>>>> anyway, it's probably easiest to test on Impala's minicluster first.
>>>>>>
>>>>>> In general, if you have a running Cloudera Managed cluster, you can
>>>>>> deploy a custom Impala build by simply overwriting the Impala existing
>>>>>> binaries and jars with the new build. If you want to go this route,
I can
>>>>>> give you a full list of files you need to replace.
>>>>>>
>>>>>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <
>>>>>> philippkrause.mail@googlemail.com> wrote:
>>>>>>
>>>>>>> Thank you for the explanation! Yes, I'm using HDFS. The single
>>>>>>> replica setup is only for test purposes at the moment. I think
this makes
>>>>>>> it easier to gain some first results since less modifications
(scheduler
>>>>>>> etc.) are neccessary.
>>>>>>> I would like to test the DistributedPlanner modification in my
>>>>>>> virtual cluster. I used a customized Vagrant script to install
Impala on
>>>>>>> multiple hosts (s.attachment). It simply installs
>>>>>>> cloudera-manager-server-db, cloudera-manager-server and
>>>>>>> cloudera-manager-daemons via apt-get. What would be the simplest
solution
>>>>>>> to setup my modified version? Could I simply call ./buildall.sh
and change
>>>>>>> the script to sth. like this?
>>>>>>>
>>>>>>> echo "Install java..."
>>>>>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>>>>>> echo "Download impala..."
>>>>>>> wget https://... where I uploaded my modified version
>>>>>>> echo "Extract impala..."
>>>>>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>>>> cd Impala-cdh5-trunk
>>>>>>> echo "Build impala..."
>>>>>>> ./buildall.sh
>>>>>>> echo "Start impala instances..."
>>>>>>> service cloudera-scm-server-db initdb
>>>>>>> service cloudera-scm-server-db start
>>>>>>> service cloudera-scm-server start
>>>>>>>
>>>>>>> Or is there another, maybe even easier method, to test the code?
>>>>>>> Maybe via bootstrap_development.sh / minicluster?
>>>>>>>
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>>
>>>>>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <alex.behm@cloudera.com>:
>>>>>>>
>>>>>>>> Apologies for the late response. Btw, your previous post
was clear
>>>>>>>> enough to me, so no worries :)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
>>>>>>>> philippkrause.mail@googlemail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello Alex,
>>>>>>>>>
>>>>>>>>> I think my previous post has been too long and confusing.
I
>>>>>>>>> apologize for that!
>>>>>>>>>
>>>>>>>>> If replicas are completely deactivated, all scan ranges
of a block
>>>>>>>>> are mapped to the one host, where the block is located
on. This host is the
>>>>>>>>> "executor"/reader for all the scan ranges of this block.
Is that correct?
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yes, assuming you are using HDFS.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> I tried to visualize my understanding of the scan_range
to host
>>>>>>>>> mapping for my use case (s. attachment). Could you please
have a quick look
>>>>>>>>> at it and tell me if this is correct?
>>>>>>>>>
>>>>>>>>> "The existing scan range assignment is scan-node centric.
For each
>>>>>>>>> scan node, we independently decide which of its scan
ranges should be
>>>>>>>>> processed by which host."
>>>>>>>>> Without replicas, all scan ranges of a block would be
assigend to
>>>>>>>>> the same host where this block is located on. Isn't everything
local here,
>>>>>>>>> so that Table_A - Block_0 and Table_B - Block_0 can be
joined local or are
>>>>>>>>> further steps neccessary? The condition in the DistributedPlanner
you
>>>>>>>>> pointed to me is set to false (no exchange nodes).
>>>>>>>>>
>>>>>>>>> "You want it to be host-centric. For each host, collect
the local
>>>>>>>>> scan ranges of *all* scan nodes, and assign them to that
host."
>>>>>>>>> Wouldn't the standard setup from above work? Wouldn't
I assign all
>>>>>>>>> (the same) scan ranges to each host in this case here?
>>>>>>>>>
>>>>>>>>
>>>>>>>> The standard setup works only in if every block only has
exactly
>>>>>>>> one replica. For our purposes, that is basically never the
case (who would
>>>>>>>> store production data without replication?), so the single-replica
>>>>>>>> assumption was not clear to me.
>>>>>>>>
>>>>>>>> Does your current setup (only changing the planner and not
the
>>>>>>>> scheduler) produce the expected results?
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you very much!
>>>>>>>>>
>>>>>>>>> Best regards
>>>>>>>>> Philipp
>>>>>>>>>
>>>>>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <
>>>>>>>>> philippkrause.mail@googlemail.com>:
>>>>>>>>>
>>>>>>>>>> Thank you for your answer and sorry for my delay!
>>>>>>>>>>
>>>>>>>>>> If my understanding is correct, the list of scan
nodes consists
>>>>>>>>>> of all nodes which contain a *local* block from a
table that is needed for
>>>>>>>>>> the query (Assumption: I have no replicas in my first
tests). If
>>>>>>>>>> TableA-Block0 is on Node_0, isn't Node_0 automatically
a scan node? And
>>>>>>>>>> wouldn't this scan node always be the host for the
complete scan range(s)
>>>>>>>>>> then?
>>>>>>>>>>
>>>>>>>>>> "For each scan node, we independently decide which
of its scan
>>>>>>>>>> ranges should be processed by which host."
>>>>>>>>>>
>>>>>>>>>> https://github.com/cloudera/Impala/blob/cdh5-trunk/be/src/sc
>>>>>>>>>> heduling/scheduler.cc#L532
>>>>>>>>>> // Loop over all scan ranges, select an executor
for those with
>>>>>>>>>> local impalads and
>>>>>>>>>> // collect all others for later processing.
>>>>>>>>>>
>>>>>>>>>> So in this whole block, scan ranges are assigned
to the closest
>>>>>>>>>> executor (=host?). But isn't the closest executor
always the node the block
>>>>>>>>>> is located on (assumed impalad is installed and I
have no replicas)? And
>>>>>>>>>> isn't this node always a scan node at the same time?
Otherwise a thread on
>>>>>>>>>> a remote host had to read the corresponding scan
range, which would be more
>>>>>>>>>> expensive. The only exception I can think of is when
all threads on the
>>>>>>>>>> local node are busy. Or, if I use replicas and all
other threads of my node
>>>>>>>>>> with the "original" block are busy, a thread on another
node which contains
>>>>>>>>>> a replica could read a special scan range of its
local block. Is my
>>>>>>>>>> understanding correct here?
>>>>>>>>>>
>>>>>>>>>> Aren't all scan ranges read locally by its scan nodes
if I have
>>>>>>>>>> impalad installed on all nodes? And am I right, that
the scan range is only
>>>>>>>>>> based on its length which refers to maxScanRangeLength
in
>>>>>>>>>> computeScanRangeLocations?
>>>>>>>>>> https://github.com/cloudera/Impala/blob/cdh5-trunk/fe/src/ma
>>>>>>>>>> in/java/org/apache/impala/planner/HdfsScanNode.java#L721
>>>>>>>>>>
>>>>>>>>>> I hope you can help me with the scan node <->
scan range->host
>>>>>>>>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0
on the same
>>>>>>>>>> node (which I want to join locally), I don't get
the point of why scan
>>>>>>>>>> ranges could be assigned to another host in my scenario.
>>>>>>>>>>
>>>>>>>>>> Best regads and thank you very much!
>>>>>>>>>> Philipp Krause
>>>>>>>>>>
>>>>>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>>>>>>>>>
>>>>>>>>>> Thanks for following up. I think I understand your
setup.
>>>>>>>>>>
>>>>>>>>>> If you want to not think about scan ranges, then
you can modify
>>>>>>>>>> HdfsScanNode.computeScanRangeLocations(). For example,
you could
>>>>>>>>>> change it to produce one scan range per file or per
HDFS block. That way
>>>>>>>>>> you'd know exactly what a scan range corresponds
to.
>>>>>>>>>>
>>>>>>>>>> I think the easiest/fastest way for you to make progress
is to
>>>>>>>>>> re-implement the existing scan range assignment logic
in that place in the
>>>>>>>>>> code I had pointed you to. There is no quick fix
to change the existing
>>>>>>>>>> behavior.
>>>>>>>>>> The existing scan range assignment is scan-node centric.
For each
>>>>>>>>>> scan node, we independently decide which of its scan
ranges should be
>>>>>>>>>> processed by which host.
>>>>>>>>>>
>>>>>>>>>> I believe an algorithm to achieve your goal would
look completely
>>>>>>>>>> different. You want it to be host-centric. For each
host, collect the local
>>>>>>>>>> scan ranges of *all* scan nodes, and assign them
to that host.
>>>>>>>>>>
>>>>>>>>>> Does that make sense?
>>>>>>>>>>
>>>>>>>>>> Alex
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
>>>>>>>>>> philippkrause.mail@googlemail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'd like to provide a small example for our purpose.
The last
>>>>>>>>>>> post may be a bit confusing, so here's a very
simple example in the
>>>>>>>>>>> attached pdf file. I hope, it's understandable.
Otherwise, please give me a
>>>>>>>>>>> short feedback.
>>>>>>>>>>>
>>>>>>>>>>> Basically, I only want each data node to join
all it's local
>>>>>>>>>>> blocks. Is there a range mapping needed or is
it possible to easily join
>>>>>>>>>>> all local blocks (regardless of its content)
since everything is already
>>>>>>>>>>> "prepared"? Maybe you can clarify this for me.
>>>>>>>>>>>
>>>>>>>>>>> As you can see in the example, the tables are
not partitioned by
>>>>>>>>>>> ID. The files are manually prepared by the help
of the modulo function. So
>>>>>>>>>>> I don't have a range like [0,10], but something
like 0,5,10,15 etc.
>>>>>>>>>>>
>>>>>>>>>>> I hope, I didn't make it too complicated and
confusing. I think,
>>>>>>>>>>> the actual idea behind this is really simple
and I hope you can help me to
>>>>>>>>>>> get this working.
>>>>>>>>>>>
>>>>>>>>>>> Best regards and thank you very much for your
time!
>>>>>>>>>>> Philipp
>>>>>>>>>>>
>>>>>>>>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>>>>>>>>>
>>>>>>>>>>> Hi! At the moment the data to parquet (block)
mapping is based
>>>>>>>>>>> on a simple modulo function: Id % #data_nodes.
So with 5 data nodes all
>>>>>>>>>>> rows with Id's 0,5,10,... are written to Parquet_0,
Id's 1,4,9 are written
>>>>>>>>>>> to Parquet_1 etc. That's what I did manually.
Since the parquet file size
>>>>>>>>>>> and the block size are both set to 64MB, each
parquet file will result in
>>>>>>>>>>> one block when I transfer the parquet files to
HDFS. By default, HDFS
>>>>>>>>>>> distributes the blocks randomly. For test purposes
I transferred
>>>>>>>>>>> corresponding blocks from Table_A and Table_B
to the same data node
>>>>>>>>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B
- Block_Y with Id's
>>>>>>>>>>> 0,5,10). In this case, they are transferred to
data_node_0 because the
>>>>>>>>>>> modulo function (which I want to implement in
the scheduler) returns 0 for
>>>>>>>>>>> these Id's. This is also done manually at the
moment.
>>>>>>>>>>>
>>>>>>>>>>> 1.) DistributedPlanner: For first, upcoming tests
I simply
>>>>>>>>>>> changed the first condition in the DistributedPlanner
to true to avoid
>>>>>>>>>>> exchange nodes.
>>>>>>>>>>>
>>>>>>>>>>> 2.) The scheduler: That's the part I'm currently
struggling
>>>>>>>>>>> with. For first tests, block replication is deactivated.
I'm not sure how /
>>>>>>>>>>> where to implement the modulo function for scan
range to host mapping.
>>>>>>>>>>> Without the modulo function, I had to implement
a hard coded mapping
>>>>>>>>>>> (something like "range" 0-0, 5-5, 10-10 ->
Data_node_0 etc.). Is that
>>>>>>>>>>> correct? Instead I would like to use a slightly
more flexible solution by
>>>>>>>>>>> the help of this modulo function for the host
mapping.
>>>>>>>>>>>
>>>>>>>>>>> I would be really grateful if you could give
me a hint for the
>>>>>>>>>>> scheduling implementation. I try to go deeper
through the code meanwhile.
>>>>>>>>>>>
>>>>>>>>>>> Best regards and thank you in advance
>>>>>>>>>>> Philipp
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>>>>>>>>>
>>>>>>>>>>> Thank you very much for these information! I'll
try to implement
>>>>>>>>>>> these two steps and post some updates within
the next days!
>>>>>>>>>>>
>>>>>>>>>>> Best regards
>>>>>>>>>>> Philipp
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <alex.behm@cloudera.com
>>>>>>>>>>> >:
>>>>>>>>>>>
>>>>>>>>>>>> Cool that you working on a research project
with Impala!
>>>>>>>>>>>>
>>>>>>>>>>>> Properly adding such a feature to Impala
is a substantial
>>>>>>>>>>>> effort, but hacking the code for an experiment
or two seems doable.
>>>>>>>>>>>>
>>>>>>>>>>>> I think you will need to modify two things:
(1) the planner to
>>>>>>>>>>>> not add exchange nodes, and (2) the scheduler
to assign the co-located scan
>>>>>>>>>>>> ranges to the same host.
>>>>>>>>>>>>
>>>>>>>>>>>> Here are a few starting points in the code:
>>>>>>>>>>>>
>>>>>>>>>>>> 1) DistributedPlanner
>>>>>>>>>>>> https://github.com/apache/impala/blob/master/fe/src/main/jav
>>>>>>>>>>>> a/org/apache/impala/planner/DistributedPlanner.java#L318
>>>>>>>>>>>>
>>>>>>>>>>>> The first condition handles the case where
no exchange nodes
>>>>>>>>>>>> need to be added because the join inputs
are already suitably partitioned.
>>>>>>>>>>>> You could hack the code to always go into
that codepath, so no
>>>>>>>>>>>> exchanges are added.
>>>>>>>>>>>>
>>>>>>>>>>>> 2) The scheduler
>>>>>>>>>>>> https://github.com/apache/impala/blob/master/be/src/scheduli
>>>>>>>>>>>> ng/scheduler.cc#L226
>>>>>>>>>>>>
>>>>>>>>>>>> You'll need to dig through and understand
that code so that you
>>>>>>>>>>>> can make the necessary changes. Change the
scan range to host mapping to
>>>>>>>>>>>> your liking. The rest of the code should
just work.
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>>>>>>>> Alex
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp
Krause <
>>>>>>>>>>>> philippkrause.mail@googlemail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you very much for your quick answers!
>>>>>>>>>>>>> The intention behind this is to improve
the execution time and
>>>>>>>>>>>>> (primarily) to examine the impact of
block-co-location (research project)
>>>>>>>>>>>>> for this particular query (simplified):
>>>>>>>>>>>>>
>>>>>>>>>>>>> select A.x, B.y, A.z from tableA as A
inner join tableB as B
>>>>>>>>>>>>> on A.id=B.id
>>>>>>>>>>>>>
>>>>>>>>>>>>> The "real" query includes three joins
and the data size is in
>>>>>>>>>>>>> pb-range. Therefore several nodes (5
in the test environment with less
>>>>>>>>>>>>> data) are used (without any load balancer).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could you give me some hints what code
changes are required
>>>>>>>>>>>>> and which files are affected? I don't
know how to give Impala the
>>>>>>>>>>>>> information that it should only join
the local data blocks on each node and
>>>>>>>>>>>>> then pass it to the "final" node which
receives all intermediate results. I
>>>>>>>>>>>>> hope you can help me to get this working.
That would be awesome!
>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>> Philipp
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander
Behm:
>>>>>>>>>>>>>
>>>>>>>>>>>>> I suppose one exception is if your data
lives only on a single
>>>>>>>>>>>>> node. Then you can set num_nodes=1 and
make sure to send the query request
>>>>>>>>>>>>> to the impalad running on the same data
node as the target data. Then you
>>>>>>>>>>>>> should get a local join.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander
Behm <
>>>>>>>>>>>>> alex.behm@cloudera.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Such a specific block arrangement
is very uncommon for
>>>>>>>>>>>>>> typical Impala setups, so we don't
attempt to recognize and optimize this
>>>>>>>>>>>>>> narrow case. In particular, such
an arrangement tends to be short lived if
>>>>>>>>>>>>>> you have the HDFS balancer turned
on.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Without making code changes, there
is no way today to remove
>>>>>>>>>>>>>> the data exchanges and make sure
that the scheduler assigns scan splits to
>>>>>>>>>>>>>> nodes in the desired way (co-located,
but with possible load imbalance).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In what way is the current setup
unacceptable to you? Is this
>>>>>>>>>>>>>> pre-mature optimization? If you have
certain performance
>>>>>>>>>>>>>> expectations/requirements for specific
queries we might be able to help you
>>>>>>>>>>>>>> improve those. If you want to pursue
this route, please help us by posting
>>>>>>>>>>>>>> complete query profiles.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Alex
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM,
Philipp Krause <
>>>>>>>>>>>>>> philippkrause.mail@googlemail.com>
wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello everyone!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In order to prevent network traffic,
I'd like to perform
>>>>>>>>>>>>>>> local joins on each node instead
of exchanging the data and perform a join
>>>>>>>>>>>>>>> over the complete data afterwards.
My query is basically a join over three
>>>>>>>>>>>>>>> three tables on an ID attribute.
The blocks are perfectly distributed, so
>>>>>>>>>>>>>>> that e.g. Table A - Block 0 
and Table B - Block 0  are on the same node.
>>>>>>>>>>>>>>> These blocks contain all data
rows with an ID range [0,1]. Table A - Block
>>>>>>>>>>>>>>> 1  and Table B - Block 1 with
an ID range [2,3] are on another node etc. So
>>>>>>>>>>>>>>> I want to perform a local join
per node because any data exchange would be
>>>>>>>>>>>>>>> unneccessary (except for the
last step when the final node recevieves all
>>>>>>>>>>>>>>> results of the other nodes).
Is this possible?
>>>>>>>>>>>>>>> At the moment the query plan
includes multiple data
>>>>>>>>>>>>>>> exchanges, although the blocks
are already perfectly distributed (manually).
>>>>>>>>>>>>>>> I would be grateful for any help!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>>>> Philipp Krause
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>
>

Mime
View raw message