impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Behm <>
Subject Re: Local join instead of data exchange - co-located blocks
Date Tue, 24 Apr 2018 05:03:39 GMT
On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause <> wrote:

> Hi Alex,
> thanks for the information! I've compiled the cdh5.13.1-release and
> replaced impala-frontend-0.1-SNAPSHOT.jar (which seems to include the
> changes in the There still seems to to be a
> method missing after replacing the jar but I'll try to figure that out.
> I have two questions concerning the code fragment in the
> you pointed to me.
> First:
> The attached graphic shows the query plan for two tables which are
> partitioned on the join attribute (standard impala version without any
> changes). If I change the if-condition to true for my modified version I
> expect to get the same result for my "hand made" partitions (outside of
> impala). But why is there an exchange broadcast (even in the standard
> version)? I mean, if I have a partition of Table 1 with ID=0 on Node X why
> is there a broadcast of the partition of Table 2 with ID=0 on Node Y?
> Actually this partition only has to be sent to Node X where the matching
> partition of Table 1 is located (instead of sending it to all nodes
> (broadcast)). Or is this exactly the case and it's only shown as a
> broadcast here in the graphics?

You are reading the plan correctly. Impala simply does not implement that

> Second:
> All corresponding blocks of the partitioned tables are on the same node
> (e.g. Table 1 Partition with ID=0, Table 2 Partition with ID=0 => Node 1
> etc.). This is what I did manually. As already mentioned before I want to
> join these partitions (blocks) locally on each node. But if I'm correct,
> the modification in the DistributedPlanner will also only lead to the plan
> in the attached graphic so that no further exchanges are created if I use
> my "hand made" partitions. But there is still the broadcast exchange which
> distributes the partitions across the cluster which isn't neccessary
> because all needed blocks are already on the same node and are ready to get
> joined locally. Is there a way to realise that and get rid of the broadcast
> exchange?
> You are right. That hack in DistributedPlanner only works for partitioned
hash joins. You can probably stitch the plan together at an earlier place,

> Please correct me if I'm wrong with my assumptions.
> Thank you very much!
> Best regards
> Philipp
> Am 18.04.2018 um 07:16 schrieb Alexander Behm:
> Your CM-managed cluster must be running a "compatible" Impala version
> already for this trick to work. It looks like your catalog binary is trying
> to find a method which does not exist in the .jar, presumably because your
> .jar is built based on a different version of Impala where that method does
> not exist anymore.
> It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is based on Impala
> 2.10, see:
> notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_
> vd_cdh_package_tarball_513
> That means this binary copying trick will only work with a modified
> version of Impala 2.10, and very likely will not work with a different
> version.
> It's probably easier to test with the mini cluster first. Alternatively,
> it "might" work if you replace all the binaries mentioned above, but it's
> quite possible that will not work.
> On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause <philippkrause.mail@
>> wrote:
>> Hi Alex! Thank you for the list! The build of the modified cdh5-trunk
>> branch (debug mode) was sucessfull. After replacing
>> "impala-frontend-0.1-SNAPSHOT.jar" in /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
>> I got the following error in my existing cluster:
>> F0416 01:16:45.402997 17897] NoSuchMethodError:
>> getCatalogObjects
>> When I switch back to the original jar file the error is gone. So it must
>> be something wrong with this file I guess. But I wonder about the error in
>> because I didn't touch any .cc files.
>> I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar". The other jar
>> files do not exist in my impala installation (CDH-5.13.1).
>> What am I doing wrong?
>> Best regards
>> Philipp
>> Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>> Here's the foll list. It might not be minimal, but copying/overwriting
>> these should work.
>> debug/service/impalad
>> debug/service/
>> debug/service/libService.a
>> release/service/impalad
>> release/service/
>> release/service/libService.a
>> yarn-extras-0.1-SNAPSHOT.jar
>> impala-data-source-api-1.0-SNAPSHOT-sources.jar
>> impala-data-source-api-1.0-SNAPSHOT.jar
>> impala-frontend-0.1-SNAPSHOT-tests.jar
>> impala-frontend-0.1-SNAPSHOT.jar
>> impala-no-sse.bc
>> impala-sse.bc
>> If you are only modifying the Java portion (like DistributedPlanner),
>> then only copying/replacing the *.jar files should be sufficient.
>> On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
>>> wrote:
>>> Yes, I have a running (virtual) cluster. I would try to follow your way
>>> with the custom impala build ( is the only modified
>>> file at the moment). Thank you in advance for the file list!
>>> Best regards
>>> Philipp
>>> Alexander Behm <> schrieb am Fr., 13. Apr. 2018,
>>> 18:45:
>>>> I'm not really following your installation/setup and am not an expert
>>>> on Cloudera Manager installation/config. If you are going to build Impala
>>>> anyway, it's probably easiest to test on Impala's minicluster first.
>>>> In general, if you have a running Cloudera Managed cluster, you can
>>>> deploy a custom Impala build by simply overwriting the Impala existing
>>>> binaries and jars with the new build. If you want to go this route, I can
>>>> give you a full list of files you need to replace.
>>>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <
>>>>> wrote:
>>>>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>>>>> setup is only for test purposes at the moment. I think this makes it
>>>>> to gain some first results since less modifications (scheduler etc.)
>>>>> neccessary.
>>>>> I would like to test the DistributedPlanner modification in my virtual
>>>>> cluster. I used a customized Vagrant script to install Impala on multiple
>>>>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>>>>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>>>>> would be the simplest solution to setup my modified version? Could I
>>>>> call ./ and change the script to sth. like this?
>>>>> echo "Install java..."
>>>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>>>> echo "Download impala..."
>>>>> wget https://... where I uploaded my modified version
>>>>> echo "Extract impala..."
>>>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>> cd Impala-cdh5-trunk
>>>>> echo "Build impala..."
>>>>> ./
>>>>> echo "Start impala instances..."
>>>>> service cloudera-scm-server-db initdb
>>>>> service cloudera-scm-server-db start
>>>>> service cloudera-scm-server start
>>>>> Or is there another, maybe even easier method, to test the code? Maybe
>>>>> via / minicluster?
>>>>> Best regards
>>>>> Philipp
>>>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <>:
>>>>>> Apologies for the late response. Btw, your previous post was clear
>>>>>> enough to me, so no worries :)
>>>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
>>>>>>> wrote:
>>>>>>> Hello Alex,
>>>>>>> I think my previous post has been too long and confusing. I
>>>>>>> apologize for that!
>>>>>>> If replicas are completely deactivated, all scan ranges of a
>>>>>>> are mapped to the one host, where the block is located on. This
host is the
>>>>>>> "executor"/reader for all the scan ranges of this block. Is that
>>>>>> Yes, assuming you are using HDFS.
>>>>>>> I tried to visualize my understanding of the scan_range to host
>>>>>>> mapping for my use case (s. attachment). Could you please have
a quick look
>>>>>>> at it and tell me if this is correct?
>>>>>>> "The existing scan range assignment is scan-node centric. For
>>>>>>> scan node, we independently decide which of its scan ranges should
>>>>>>> processed by which host."
>>>>>>> Without replicas, all scan ranges of a block would be assigend
>>>>>>> the same host where this block is located on. Isn't everything
local here,
>>>>>>> so that Table_A - Block_0 and Table_B - Block_0 can be joined
local or are
>>>>>>> further steps neccessary? The condition in the DistributedPlanner
>>>>>>> pointed to me is set to false (no exchange nodes).
>>>>>>> "You want it to be host-centric. For each host, collect the local
>>>>>>> scan ranges of *all* scan nodes, and assign them to that host."
>>>>>>> Wouldn't the standard setup from above work? Wouldn't I assign
>>>>>>> (the same) scan ranges to each host in this case here?
>>>>>> The standard setup works only in if every block only has exactly
>>>>>> replica. For our purposes, that is basically never the case (who
>>>>>> store production data without replication?), so the single-replica
>>>>>> assumption was not clear to me.
>>>>>> Does your current setup (only changing the planner and not the
>>>>>> scheduler) produce the expected results?
>>>>>>> Thank you very much!
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <
>>>>>>>> Thank you for your answer and sorry for my delay!
>>>>>>>> If my understanding is correct, the list of scan nodes consists
>>>>>>>> all nodes which contain a *local* block from a table that
is needed for the
>>>>>>>> query (Assumption: I have no replicas in my first tests).
If TableA-Block0
>>>>>>>> is on Node_0, isn't Node_0 automatically a scan node? And
wouldn't this
>>>>>>>> scan node always be the host for the complete scan range(s)
>>>>>>>> "For each scan node, we independently decide which of its
>>>>>>>> ranges should be processed by which host."
>>>>>>>> heduling/
>>>>>>>> // Loop over all scan ranges, select an executor for those
>>>>>>>> local impalads and
>>>>>>>> // collect all others for later processing.
>>>>>>>> So in this whole block, scan ranges are assigned to the closest
>>>>>>>> executor (=host?). But isn't the closest executor always
the node the block
>>>>>>>> is located on (assumed impalad is installed and I have no
replicas)? And
>>>>>>>> isn't this node always a scan node at the same time? Otherwise
a thread on
>>>>>>>> a remote host had to read the corresponding scan range, which
would be more
>>>>>>>> expensive. The only exception I can think of is when all
threads on the
>>>>>>>> local node are busy. Or, if I use replicas and all other
threads of my node
>>>>>>>> with the "original" block are busy, a thread on another node
which contains
>>>>>>>> a replica could read a special scan range of its local block.
Is my
>>>>>>>> understanding correct here?
>>>>>>>> Aren't all scan ranges read locally by its scan nodes if
I have
>>>>>>>> impalad installed on all nodes? And am I right, that the
scan range is only
>>>>>>>> based on its length which refers to maxScanRangeLength in
>>>>>>>> computeScanRangeLocations?
>>>>>>>> in/java/org/apache/impala/planner/
>>>>>>>> I hope you can help me with the scan node <-> scan
>>>>>>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0
on the same
>>>>>>>> node (which I want to join locally), I don't get the point
of why scan
>>>>>>>> ranges could be assigned to another host in my scenario.
>>>>>>>> Best regads and thank you very much!
>>>>>>>> Philipp Krause
>>>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>>>>>>> Thanks for following up. I think I understand your setup.
>>>>>>>> If you want to not think about scan ranges, then you can
>>>>>>>> HdfsScanNode.computeScanRangeLocations(). For example, you
>>>>>>>> change it to produce one scan range per file or per HDFS
block. That way
>>>>>>>> you'd know exactly what a scan range corresponds to.
>>>>>>>> I think the easiest/fastest way for you to make progress
is to
>>>>>>>> re-implement the existing scan range assignment logic in
that place in the
>>>>>>>> code I had pointed you to. There is no quick fix to change
the existing
>>>>>>>> behavior.
>>>>>>>> The existing scan range assignment is scan-node centric.
For each
>>>>>>>> scan node, we independently decide which of its scan ranges
should be
>>>>>>>> processed by which host.
>>>>>>>> I believe an algorithm to achieve your goal would look completely
>>>>>>>> different. You want it to be host-centric. For each host,
collect the local
>>>>>>>> scan ranges of *all* scan nodes, and assign them to that
>>>>>>>> Does that make sense?
>>>>>>>> Alex
>>>>>>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
>>>>>>>>> wrote:
>>>>>>>>> I'd like to provide a small example for our purpose.
The last post
>>>>>>>>> may be a bit confusing, so here's a very simple example
in the attached pdf
>>>>>>>>> file. I hope, it's understandable. Otherwise, please
give me a short
>>>>>>>>> feedback.
>>>>>>>>> Basically, I only want each data node to join all it's
>>>>>>>>> blocks. Is there a range mapping needed or is it possible
to easily join
>>>>>>>>> all local blocks (regardless of its content) since everything
is already
>>>>>>>>> "prepared"? Maybe you can clarify this for me.
>>>>>>>>> As you can see in the example, the tables are not partitioned
>>>>>>>>> ID. The files are manually prepared by the help of the
modulo function. So
>>>>>>>>> I don't have a range like [0,10], but something like
0,5,10,15 etc.
>>>>>>>>> I hope, I didn't make it too complicated and confusing.
I think,
>>>>>>>>> the actual idea behind this is really simple and I hope
you can help me to
>>>>>>>>> get this working.
>>>>>>>>> Best regards and thank you very much for your time!
>>>>>>>>> Philipp
>>>>>>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>>>>>>> Hi! At the moment the data to parquet (block) mapping
is based on
>>>>>>>>> a simple modulo function: Id % #data_nodes. So with 5
data nodes all rows
>>>>>>>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9
are written to
>>>>>>>>> Parquet_1 etc. That's what I did manually. Since the
parquet file size and
>>>>>>>>> the block size are both set to 64MB, each parquet file
will result in one
>>>>>>>>> block when I transfer the parquet files to HDFS. By default,
>>>>>>>>> distributes the blocks randomly. For test purposes I
>>>>>>>>> corresponding blocks from Table_A and Table_B to the
same data node
>>>>>>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y
with Id's
>>>>>>>>> 0,5,10). In this case, they are transferred to data_node_0
because the
>>>>>>>>> modulo function (which I want to implement in the scheduler)
returns 0 for
>>>>>>>>> these Id's. This is also done manually at the moment.
>>>>>>>>> 1.) DistributedPlanner: For first, upcoming tests I simply
>>>>>>>>> the first condition in the DistributedPlanner to true
to avoid exchange
>>>>>>>>> nodes.
>>>>>>>>> 2.) The scheduler: That's the part I'm currently struggling
>>>>>>>>> For first tests, block replication is deactivated. I'm
not sure how / where
>>>>>>>>> to implement the modulo function for scan range to host
mapping. Without
>>>>>>>>> the modulo function, I had to implement a hard coded
mapping (something
>>>>>>>>> like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.).
Is that correct? Instead
>>>>>>>>> I would like to use a slightly more flexible solution
by the help of this
>>>>>>>>> modulo function for the host mapping.
>>>>>>>>> I would be really grateful if you could give me a hint
for the
>>>>>>>>> scheduling implementation. I try to go deeper through
the code meanwhile.
>>>>>>>>> Best regards and thank you in advance
>>>>>>>>> Philipp
>>>>>>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>>>>>>> Thank you very much for these information! I'll try to
>>>>>>>>> these two steps and post some updates within the next
>>>>>>>>> Best regards
>>>>>>>>> Philipp
>>>>>>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <>:
>>>>>>>>>> Cool that you working on a research project with
>>>>>>>>>> Properly adding such a feature to Impala is a substantial
>>>>>>>>>> but hacking the code for an experiment or two seems
>>>>>>>>>> I think you will need to modify two things: (1) the
planner to
>>>>>>>>>> not add exchange nodes, and (2) the scheduler to
assign the co-located scan
>>>>>>>>>> ranges to the same host.
>>>>>>>>>> Here are a few starting points in the code:
>>>>>>>>>> 1) DistributedPlanner
>>>>>>>>>> a/org/apache/impala/planner/
>>>>>>>>>> The first condition handles the case where no exchange
nodes need
>>>>>>>>>> to be added because the join inputs are already suitably
>>>>>>>>>> You could hack the code to always go into that codepath,
so no
>>>>>>>>>> exchanges are added.
>>>>>>>>>> 2) The scheduler
>>>>>>>>>> ng/
>>>>>>>>>> You'll need to dig through and understand that code
so that you
>>>>>>>>>> can make the necessary changes. Change the scan range
to host mapping to
>>>>>>>>>> your liking. The rest of the code should just work.
>>>>>>>>>> Cheers,
>>>>>>>>>> Alex
>>>>>>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Thank you very much for your quick answers!
>>>>>>>>>>> The intention behind this is to improve the execution
time and
>>>>>>>>>>> (primarily) to examine the impact of block-co-location
(research project)
>>>>>>>>>>> for this particular query (simplified):
>>>>>>>>>>> select A.x, B.y, A.z from tableA as A inner join
tableB as B on
>>>>>>>>>>> The "real" query includes three joins and the
data size is in
>>>>>>>>>>> pb-range. Therefore several nodes (5 in the test
environment with less
>>>>>>>>>>> data) are used (without any load balancer).
>>>>>>>>>>> Could you give me some hints what code changes
are required and
>>>>>>>>>>> which files are affected? I don't know how to
give Impala the information
>>>>>>>>>>> that it should only join the local data blocks
on each node and then pass
>>>>>>>>>>> it to the "final" node which receives all intermediate
results. I hope you
>>>>>>>>>>> can help me to get this working. That would be
>>>>>>>>>>> Best regards
>>>>>>>>>>> Philipp
>>>>>>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>>>>>>>>> I suppose one exception is if your data lives
only on a single
>>>>>>>>>>> node. Then you can set num_nodes=1 and make sure
to send the query request
>>>>>>>>>>> to the impalad running on the same data node
as the target data. Then you
>>>>>>>>>>> should get a local join.
>>>>>>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> Such a specific block arrangement is very
uncommon for typical
>>>>>>>>>>>> Impala setups, so we don't attempt to recognize
and optimize this narrow
>>>>>>>>>>>> case. In particular, such an arrangement
tends to be short lived if you
>>>>>>>>>>>> have the HDFS balancer turned on.
>>>>>>>>>>>> Without making code changes, there is no
way today to remove
>>>>>>>>>>>> the data exchanges and make sure that the
scheduler assigns scan splits to
>>>>>>>>>>>> nodes in the desired way (co-located, but
with possible load imbalance).
>>>>>>>>>>>> In what way is the current setup unacceptable
to you? Is this
>>>>>>>>>>>> pre-mature optimization? If you have certain
>>>>>>>>>>>> expectations/requirements for specific queries
we might be able to help you
>>>>>>>>>>>> improve those. If you want to pursue this
route, please help us by posting
>>>>>>>>>>>> complete query profiles.
>>>>>>>>>>>> Alex
>>>>>>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp
Krause <
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hello everyone!
>>>>>>>>>>>>> In order to prevent network traffic,
I'd like to perform local
>>>>>>>>>>>>> joins on each node instead of exchanging
the data and perform a join over
>>>>>>>>>>>>> the complete data afterwards. My query
is basically a join over three three
>>>>>>>>>>>>> tables on an ID attribute. The blocks
are perfectly distributed, so that
>>>>>>>>>>>>> e.g. Table A - Block 0  and Table B -
Block 0  are on the same node. These
>>>>>>>>>>>>> blocks contain all data rows with an
ID range [0,1]. Table A - Block 1  and
>>>>>>>>>>>>> Table B - Block 1 with an ID range [2,3]
are on another node etc. So I want
>>>>>>>>>>>>> to perform a local join per node because
any data exchange would be
>>>>>>>>>>>>> unneccessary (except for the last step
when the final node recevieves all
>>>>>>>>>>>>> results of the other nodes). Is this
>>>>>>>>>>>>> At the moment the query plan includes
multiple data exchanges,
>>>>>>>>>>>>> although the blocks are already perfectly
distributed (manually).
>>>>>>>>>>>>> I would be grateful for any help!
>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>> Philipp Krause

View raw message