impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <>
Subject Re: Local join instead of data exchange - co-located blocks
Date Mon, 14 May 2018 08:18:55 GMT
Hello Alex,

I suppose you're very busy, so I apologize for the interruption. If you
have any idea of what I could try to solve this problem, please let me
know. Currently I don't know how to progress and I'd appreciate any help
you can give me.

Best regards

Philipp Krause <> schrieb am Mo., 7. Mai
2018, 12:59:

> I just wanted to add, that I tried the join with two other, minimal and
> "fresh" tables. All blocks from both tables were on the same node but I got
> the same result that no data were processed. To me, the scan range mapping
> of my modified version looks the same compared to the original one. I only
> noticed a difference in the query profile:
> Filter 0 (1.00 MB):
>              - Files processed: 1 (1)
>              - Files rejected: 1 (1)
> ...
> This filter only appears in my modified version. Hopefully we can find the
> mistake.
> Am 04.05.2018 um 15:40 schrieb Philipp Krause:
> Hi!
> The query profile and the scan range mappings are attached
> (query_profile.txt + scan_ranges.txt). The complete log file is also
> attached. The mapping looks fine to me, I couldn't find any mistakes there.
> For example, line 168 (scan_ranges.txt) shows that partition ID=4 is
> assigned to node_0 and partition ID=10 is assigned to node_1. Both
> partitions contain all id=4 rows which should be correct for the join. But
> probably I have overlooked something in the log.
> The partition/block setup is as follows:
> 6 Nodes (1 Namenode, 5 Datanodes)
> Node 1:
> Node 2: 0|0 5|5
> Node 3: 1|1
> Node 4: 2|2
> Node 5: 3|3
> Node 6: 4|4
> 0|0 means partition_0 from table A and B.
> Also thanks to Lars for the logging option, which I have used!
> Best regards
> Philipp
> Am 04.05.2018 um 07:10 schrieb Lars Volker:
> I haven't followed this thread closely, but you can also print all scan
> range assignments made by the scheduler by passing -vmodule=scheduler=2 as
> a startup option. The logging happens in
> <>
> This wiki page has a way to achieve that using environment variables:
> Cheers, Lars
> On Thu, May 3, 2018 at 8:54 PM, Alexander Behm <>
> wrote:
>> No idea what's going on, but my guess is something is awry with the
>> scan-range assignment. Can you attach the full profile? It's probably also
>> good to print the scan ranges created in
>> HdfsScanNode.computeScanRangeLocations().
>> On Thu, May 3, 2018 at 5:51 PM, Philipp Krause <
>>> wrote:
>>> Hello Alex,
>>> I have tried out several configurations but I still couldn't find a
>>> solution for my problem :( In the query summary (s. attachment) it looks
>>> like as if no rows are read. Do you have an idea what I have to change? I
>>> am sorry for the circumstances and thank you once more for the great
>>> support to get this working!
>>> Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>>> Hi Alex,
>>> I got the modified version working on my cluster. The query plan looks
>>> exactly as wanted (s. attachment). This is awesome! Unfortunately the
>>> result set is empty. As you can see in query_state.png, the scan progress
>>> always shows 50% although the query has finished.
>>> The only modification in the code is the if statement you pointed to me
>>> (I set it to true). Maybe I have to give Impala the information about the
>>> lhs / rhs join partition since there are no exchange nodes now (like in the
>>> following lines)? The corresponding  partitions / blocks of each table are
>>> on the same node.
>>> I think we are very close to the final result and I hope you can help me
>>> once more. Thank you so much!
>>> Best regards
>>> Philipp
>>> Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>>> On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause <
>>>> wrote:
>>>> To prevent the broadcast join I could simply use the shuffle operator
>>>> in the query:
>>>> SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>>>> business_partition_2 WHERE
>>>> business_partition_1.businessid=business_partition_2.businessid
>>> Not sure what version of Impala you are using, and whether hints
>>> override any changes you might make. I suggest you make the code work as
>>> you wish without requiring hints.
>>>> I think the broadcast is currently only used because of my very small
>>>> test tables.
>>>> This gives me the plan attached as partitioned_shuffle.png. Since my
>>>> modified version isn't working yet, I partitioned both tables on businessid
>>>> in Impala. The "hack" should only help to get into the if-condition if I
>>>> partition the data manually, right?. But in this case (if the partitioning
>>>> is done by Impala itself) Impala should get into the if-condition anyway.
>>>> Unfortunately I can't see a difference in the plan compared to my
>>>> unpartitioned tables (unpartitioned.png) concerning the exchange nodes. My
>>>> goal is actually to get rid of all exchange nodes since the corresponding
>>>> data is already present on each node. Actually the plan should look the
>>>> same except for the 03 and 04 exchange then.
>>> I understand the picture and goal. I suggest you read, understand, and
>>> modify the code in DistributedPlanner.createHashJoinFragment() to create
>>> the plan shape that you want.
>>> I don't know how you are producing these plans. Are you sure your code
>>> changes are taking effect?
>>>> Are there other changes neccessary to just take Table1-partition/block
>>>> X and Table2-partition/block Y on each node and join them without any data
>>>> exchange? Actually each node should take all its local blocks for both
>>>> tables, join them and pass the results back to the coordinator where all
>>>> results come together (please see explanation.png).
>>> No. You just need to create the plan without exchange nodes, as we've
>>> already gone through early in this thread.
>>>> I'm looking forward hearing from you. I hope we can realise this.
>>>> Best regards
>>>> Philipp
>>>> Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>>> On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause <
>>>>> wrote:
>>>>> Hi Alex,
>>>>> thanks for the information! I've compiled the cdh5.13.1-release and
>>>>> replaced impala-frontend-0.1-SNAPSHOT.jar (which seems to include the
>>>>> changes in the There still seems to to be a
>>>>> method missing after replacing the jar but I'll try to figure that out.
>>>>> I have two questions concerning the code fragment in the
>>>>> you pointed to me.
>>>>> First:
>>>>> The attached graphic shows the query plan for two tables which are
>>>>> partitioned on the join attribute (standard impala version without any
>>>>> changes). If I change the if-condition to true for my modified version
>>>>> expect to get the same result for my "hand made" partitions (outside
>>>>> impala). But why is there an exchange broadcast (even in the standard
>>>>> version)? I mean, if I have a partition of Table 1 with ID=0 on Node
X why
>>>>> is there a broadcast of the partition of Table 2 with ID=0 on Node Y?
>>>>> Actually this partition only has to be sent to Node X where the matching
>>>>> partition of Table 1 is located (instead of sending it to all nodes
>>>>> (broadcast)). Or is this exactly the case and it's only shown as a
>>>>> broadcast here in the graphics?
>>>> You are reading the plan correctly. Impala simply does not implement
>>>> that optimization.
>>>>> Second:
>>>>> All corresponding blocks of the partitioned tables are on the same
>>>>> node (e.g. Table 1 Partition with ID=0, Table 2 Partition with ID=0 =>
>>>>> 1 etc.). This is what I did manually. As already mentioned before I want
>>>>> join these partitions (blocks) locally on each node. But if I'm correct,
>>>>> the modification in the DistributedPlanner will also only lead to the
>>>>> in the attached graphic so that no further exchanges are created if I
>>>>> my "hand made" partitions. But there is still the broadcast exchange
>>>>> distributes the partitions across the cluster which isn't neccessary
>>>>> because all needed blocks are already on the same node and are ready
to get
>>>>> joined locally. Is there a way to realise that and get rid of the broadcast
>>>>> exchange?
>>>>> You are right. That hack in DistributedPlanner only works for
>>>> partitioned hash joins. You can probably stitch the plan together at an
>>>> earlier place, e.g.:
>>>>> Please correct me if I'm wrong with my assumptions.
>>>>> Thank you very much!
>>>>> Best regards
>>>>> Philipp
>>>>> Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>>> Your CM-managed cluster must be running a "compatible" Impala version
>>>>> already for this trick to work. It looks like your catalog binary is
>>>>> to find a method which does not exist in the .jar, presumably because
>>>>> .jar is built based on a different version of Impala where that method
>>>>> not exist anymore.
>>>>> It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is based on
>>>>> Impala 2.10, see:
>>>>> That means this binary copying trick will only work with a modified
>>>>> version of Impala 2.10, and very likely will not work with a different
>>>>> version.
>>>>> It's probably easier to test with the mini cluster first.
>>>>> Alternatively, it "might" work if you replace all the binaries mentioned
>>>>> above, but it's quite possible that will not work.
>>>>> On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause <
>>>>>> wrote:
>>>>>> Hi Alex! Thank you for the list! The build of the modified cdh5-trunk
>>>>>> branch (debug mode) was sucessfull. After replacing
>>>>>> "impala-frontend-0.1-SNAPSHOT.jar" in
>>>>>> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/ I got the
>>>>>> error in my existing cluster:
>>>>>> F0416 01:16:45.402997 17897] NoSuchMethodError:
>>>>>> getCatalogObjects
>>>>>> When I switch back to the original jar file the error is gone. So
>>>>>> must be something wrong with this file I guess. But I wonder about
>>>>>> error in because I didn't touch any .cc files.
>>>>>> I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar". The other
>>>>>> jar files do not exist in my impala installation (CDH-5.13.1).
>>>>>> What am I doing wrong?
>>>>>> Best regards
>>>>>> Philipp
>>>>>> Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>>> Here's the foll list. It might not be minimal, but
>>>>>> copying/overwriting these should work.
>>>>>> debug/service/impalad
>>>>>> debug/service/
>>>>>> debug/service/libService.a
>>>>>> release/service/impalad
>>>>>> release/service/
>>>>>> release/service/libService.a
>>>>>> yarn-extras-0.1-SNAPSHOT.jar
>>>>>> impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>>>> impala-data-source-api-1.0-SNAPSHOT.jar
>>>>>> impala-frontend-0.1-SNAPSHOT-tests.jar
>>>>>> impala-frontend-0.1-SNAPSHOT.jar
>>>>>> impala-no-sse.bc
>>>>>> impala-sse.bc
>>>>>> If you are only modifying the Java portion (like DistributedPlanner),
>>>>>> then only copying/replacing the *.jar files should be sufficient.
>>>>>> On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
>>>>>>> wrote:
>>>>>>> Yes, I have a running (virtual) cluster. I would try to follow
>>>>>>> way with the custom impala build ( is
the only
>>>>>>> modified file at the moment). Thank you in advance for the file
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>> Alexander Behm <> schrieb am Fr.,
13. Apr.
>>>>>>> 2018, 18:45:
>>>>>>>> I'm not really following your installation/setup and am not
>>>>>>>> expert on Cloudera Manager installation/config. If you are
going to build
>>>>>>>> Impala anyway, it's probably easiest to test on Impala's
minicluster first.
>>>>>>>> In general, if you have a running Cloudera Managed cluster,
you can
>>>>>>>> deploy a custom Impala build by simply overwriting the Impala
>>>>>>>> binaries and jars with the new build. If you want to go this
route, I can
>>>>>>>> give you a full list of files you need to replace.
>>>>>>>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <
>>>>>>>>> wrote:
>>>>>>>>> Thank you for the explanation! Yes, I'm using HDFS. The
>>>>>>>>> replica setup is only for test purposes at the moment.
I think this makes
>>>>>>>>> it easier to gain some first results since less modifications
>>>>>>>>> etc.) are neccessary.
>>>>>>>>> I would like to test the DistributedPlanner modification
in my
>>>>>>>>> virtual cluster. I used a customized Vagrant script to
install Impala on
>>>>>>>>> multiple hosts (s.attachment). It simply installs
>>>>>>>>> cloudera-manager-server-db, cloudera-manager-server and
>>>>>>>>> cloudera-manager-daemons via apt-get. What would be the
simplest solution
>>>>>>>>> to setup my modified version? Could I simply call ./
and change
>>>>>>>>> the script to sth. like this?
>>>>>>>>> echo "Install java..."
>>>>>>>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>>>>>>>> echo "Download impala..."
>>>>>>>>> wget https://... where I uploaded my modified version
>>>>>>>>> echo "Extract impala..."
>>>>>>>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>>>>>> cd Impala-cdh5-trunk
>>>>>>>>> echo "Build impala..."
>>>>>>>>> ./
>>>>>>>>> echo "Start impala instances..."
>>>>>>>>> service cloudera-scm-server-db initdb
>>>>>>>>> service cloudera-scm-server-db start
>>>>>>>>> service cloudera-scm-server start
>>>>>>>>> Or is there another, maybe even easier method, to test
the code?
>>>>>>>>> Maybe via / minicluster?
>>>>>>>>> Best regards
>>>>>>>>> Philipp
>>>>>>>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <>
>>>>>>>>> :
>>>>>>>>>> Apologies for the late response. Btw, your previous
post was
>>>>>>>>>> clear enough to me, so no worries :)
>>>>>>>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Hello Alex,
>>>>>>>>>>> I think my previous post has been too long and
confusing. I
>>>>>>>>>>> apologize for that!
>>>>>>>>>>> If replicas are completely deactivated, all scan
ranges of a
>>>>>>>>>>> block are mapped to the one host, where the block
is located on. This host
>>>>>>>>>>> is the "executor"/reader for all the scan ranges
of this block. Is that
>>>>>>>>>>> correct?
>>>>>>>>>> Yes, assuming you are using HDFS.
>>>>>>>>>>> I tried to visualize my understanding of the
scan_range to host
>>>>>>>>>>> mapping for my use case (s. attachment). Could
you please have a quick look
>>>>>>>>>>> at it and tell me if this is correct?
>>>>>>>>>>> "The existing scan range assignment is scan-node
centric. For
>>>>>>>>>>> each scan node, we independently decide which
of its scan ranges should be
>>>>>>>>>>> processed by which host."
>>>>>>>>>>> Without replicas, all scan ranges of a block
would be assigend
>>>>>>>>>>> to the same host where this block is located
on. Isn't everything local
>>>>>>>>>>> here, so that Table_A - Block_0 and Table_B -
Block_0 can be joined local
>>>>>>>>>>> or are further steps neccessary? The condition
in the DistributedPlanner
>>>>>>>>>>> you pointed to me is set to false (no exchange
>>>>>>>>>>> "You want it to be host-centric. For each host,
collect the
>>>>>>>>>>> local scan ranges of *all* scan nodes, and assign
them to that host."
>>>>>>>>>>> Wouldn't the standard setup from above work?
Wouldn't I assign
>>>>>>>>>>> all (the same) scan ranges to each host in this
case here?
>>>>>>>>>> The standard setup works only in if every block only
has exactly
>>>>>>>>>> one replica. For our purposes, that is basically
never the case (who would
>>>>>>>>>> store production data without replication?), so the
>>>>>>>>>> assumption was not clear to me.
>>>>>>>>>> Does your current setup (only changing the planner
and not the
>>>>>>>>>> scheduler) produce the expected results?
>>>>>>>>>>> Thank you very much!
>>>>>>>>>>> Best regards
>>>>>>>>>>> Philipp
>>>>>>>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <
>>>>>>>>>>>> Thank you for your answer and sorry for my
>>>>>>>>>>>> If my understanding is correct, the list
of scan nodes consists
>>>>>>>>>>>> of all nodes which contain a *local* block
from a table that is needed for
>>>>>>>>>>>> the query (Assumption: I have no replicas
in my first tests). If
>>>>>>>>>>>> TableA-Block0 is on Node_0, isn't Node_0
automatically a scan node? And
>>>>>>>>>>>> wouldn't this scan node always be the host
for the complete scan range(s)
>>>>>>>>>>>> then?
>>>>>>>>>>>> "For each scan node, we independently decide
which of its scan
>>>>>>>>>>>> ranges should be processed by which host."
>>>>>>>>>>>> // Loop over all scan ranges, select an executor
for those with
>>>>>>>>>>>> local impalads and
>>>>>>>>>>>> // collect all others for later processing.
>>>>>>>>>>>> So in this whole block, scan ranges are assigned
to the closest
>>>>>>>>>>>> executor (=host?). But isn't the closest
executor always the node the block
>>>>>>>>>>>> is located on (assumed impalad is installed
and I have no replicas)? And
>>>>>>>>>>>> isn't this node always a scan node at the
same time? Otherwise a thread on
>>>>>>>>>>>> a remote host had to read the corresponding
scan range, which would be more
>>>>>>>>>>>> expensive. The only exception I can think
of is when all threads on the
>>>>>>>>>>>> local node are busy. Or, if I use replicas
and all other threads of my node
>>>>>>>>>>>> with the "original" block are busy, a thread
on another node which contains
>>>>>>>>>>>> a replica could read a special scan range
of its local block. Is my
>>>>>>>>>>>> understanding correct here?
>>>>>>>>>>>> Aren't all scan ranges read locally by its
scan nodes if I have
>>>>>>>>>>>> impalad installed on all nodes? And am I
right, that the scan range is only
>>>>>>>>>>>> based on its length which refers to maxScanRangeLength
>>>>>>>>>>>> computeScanRangeLocations?
>>>>>>>>>>>> I hope you can help me with the scan node
<-> scan range->host
>>>>>>>>>>>> relationship. If I have Table_A-Block_0 and
Table_B_Block_0 on the same
>>>>>>>>>>>> node (which I want to join locally), I don't
get the point of why scan
>>>>>>>>>>>> ranges could be assigned to another host
in my scenario.
>>>>>>>>>>>> Best regads and thank you very much!
>>>>>>>>>>>> Philipp Krause
>>>>>>>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander

View raw message