impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Behm <>
Subject Re: Local join instead of data exchange - co-located blocks
Date Wed, 18 Apr 2018 05:16:02 GMT
Your CM-managed cluster must be running a "compatible" Impala version
already for this trick to work. It looks like your catalog binary is trying
to find a method which does not exist in the .jar, presumably because your
.jar is built based on a different version of Impala where that method does
not exist anymore.

It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is based on Impala
2.10, see:

That means this binary copying trick will only work with a modified version
of Impala 2.10, and very likely will not work with a different version.

It's probably easier to test with the mini cluster first. Alternatively, it
"might" work if you replace all the binaries mentioned above, but it's
quite possible that will not work.

On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause <> wrote:

> Hi Alex! Thank you for the list! The build of the modified cdh5-trunk
> branch (debug mode) was sucessfull. After replacing
> "impala-frontend-0.1-SNAPSHOT.jar" in /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/
> I got the following error in my existing cluster:
> F0416 01:16:45.402997 17897] NoSuchMethodError:
> getCatalogObjects
> When I switch back to the original jar file the error is gone. So it must
> be something wrong with this file I guess. But I wonder about the error in
> because I didn't touch any .cc files.
> I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar". The other jar
> files do not exist in my impala installation (CDH-5.13.1).
> What am I doing wrong?
> Best regards
> Philipp
> Am 13.04.2018 um 20:12 schrieb Alexander Behm:
> Here's the foll list. It might not be minimal, but copying/overwriting
> these should work.
> debug/service/impalad
> debug/service/
> debug/service/libService.a
> release/service/impalad
> release/service/
> release/service/libService.a
> yarn-extras-0.1-SNAPSHOT.jar
> impala-data-source-api-1.0-SNAPSHOT-sources.jar
> impala-data-source-api-1.0-SNAPSHOT.jar
> impala-frontend-0.1-SNAPSHOT-tests.jar
> impala-frontend-0.1-SNAPSHOT.jar
> impala-no-sse.bc
> impala-sse.bc
> If you are only modifying the Java portion (like DistributedPlanner), then
> only copying/replacing the *.jar files should be sufficient.
> On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <philippkrause.mail@
>> wrote:
>> Yes, I have a running (virtual) cluster. I would try to follow your way
>> with the custom impala build ( is the only modified
>> file at the moment). Thank you in advance for the file list!
>> Best regards
>> Philipp
>> Alexander Behm <> schrieb am Fr., 13. Apr. 2018,
>> 18:45:
>>> I'm not really following your installation/setup and am not an expert on
>>> Cloudera Manager installation/config. If you are going to build Impala
>>> anyway, it's probably easiest to test on Impala's minicluster first.
>>> In general, if you have a running Cloudera Managed cluster, you can
>>> deploy a custom Impala build by simply overwriting the Impala existing
>>> binaries and jars with the new build. If you want to go this route, I can
>>> give you a full list of files you need to replace.
>>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <
>>>> wrote:
>>>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>>>> setup is only for test purposes at the moment. I think this makes it easier
>>>> to gain some first results since less modifications (scheduler etc.) are
>>>> neccessary.
>>>> I would like to test the DistributedPlanner modification in my virtual
>>>> cluster. I used a customized Vagrant script to install Impala on multiple
>>>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>>>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>>>> would be the simplest solution to setup my modified version? Could I simply
>>>> call ./ and change the script to sth. like this?
>>>> echo "Install java..."
>>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>>> echo "Download impala..."
>>>> wget https://... where I uploaded my modified version
>>>> echo "Extract impala..."
>>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>>> cd Impala-cdh5-trunk
>>>> echo "Build impala..."
>>>> ./
>>>> echo "Start impala instances..."
>>>> service cloudera-scm-server-db initdb
>>>> service cloudera-scm-server-db start
>>>> service cloudera-scm-server start
>>>> Or is there another, maybe even easier method, to test the code? Maybe
>>>> via / minicluster?
>>>> Best regards
>>>> Philipp
>>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <>:
>>>>> Apologies for the late response. Btw, your previous post was clear
>>>>> enough to me, so no worries :)
>>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <
>>>>>> wrote:
>>>>>> Hello Alex,
>>>>>> I think my previous post has been too long and confusing. I apologize
>>>>>> for that!
>>>>>> If replicas are completely deactivated, all scan ranges of a block
>>>>>> are mapped to the one host, where the block is located on. This host
is the
>>>>>> "executor"/reader for all the scan ranges of this block. Is that
>>>>> Yes, assuming you are using HDFS.
>>>>>> I tried to visualize my understanding of the scan_range to host
>>>>>> mapping for my use case (s. attachment). Could you please have a
quick look
>>>>>> at it and tell me if this is correct?
>>>>>> "The existing scan range assignment is scan-node centric. For each
>>>>>> scan node, we independently decide which of its scan ranges should
>>>>>> processed by which host."
>>>>>> Without replicas, all scan ranges of a block would be assigend to
>>>>>> same host where this block is located on. Isn't everything local
here, so
>>>>>> that Table_A - Block_0 and Table_B - Block_0 can be joined local
or are
>>>>>> further steps neccessary? The condition in the DistributedPlanner
>>>>>> pointed to me is set to false (no exchange nodes).
>>>>>> "You want it to be host-centric. For each host, collect the local
>>>>>> scan ranges of *all* scan nodes, and assign them to that host."
>>>>>> Wouldn't the standard setup from above work? Wouldn't I assign all
>>>>>> (the same) scan ranges to each host in this case here?
>>>>> The standard setup works only in if every block only has exactly one
>>>>> replica. For our purposes, that is basically never the case (who would
>>>>> store production data without replication?), so the single-replica
>>>>> assumption was not clear to me.
>>>>> Does your current setup (only changing the planner and not the
>>>>> scheduler) produce the expected results?
>>>>>> Thank you very much!
>>>>>> Best regards
>>>>>> Philipp
>>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <
>>>>>>> Thank you for your answer and sorry for my delay!
>>>>>>> If my understanding is correct, the list of scan nodes consists
>>>>>>> all nodes which contain a *local* block from a table that is
needed for the
>>>>>>> query (Assumption: I have no replicas in my first tests). If
>>>>>>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't
>>>>>>> scan node always be the host for the complete scan range(s) then?
>>>>>>> "For each scan node, we independently decide which of its scan
>>>>>>> ranges should be processed by which host."
>>>>>>> heduling/
>>>>>>> // Loop over all scan ranges, select an executor for those with
>>>>>>> local impalads and
>>>>>>> // collect all others for later processing.
>>>>>>> So in this whole block, scan ranges are assigned to the closest
>>>>>>> executor (=host?). But isn't the closest executor always the
node the block
>>>>>>> is located on (assumed impalad is installed and I have no replicas)?
>>>>>>> isn't this node always a scan node at the same time? Otherwise
a thread on
>>>>>>> a remote host had to read the corresponding scan range, which
would be more
>>>>>>> expensive. The only exception I can think of is when all threads
on the
>>>>>>> local node are busy. Or, if I use replicas and all other threads
of my node
>>>>>>> with the "original" block are busy, a thread on another node
which contains
>>>>>>> a replica could read a special scan range of its local block.
Is my
>>>>>>> understanding correct here?
>>>>>>> Aren't all scan ranges read locally by its scan nodes if I have
>>>>>>> impalad installed on all nodes? And am I right, that the scan
range is only
>>>>>>> based on its length which refers to maxScanRangeLength in
>>>>>>> computeScanRangeLocations?
>>>>>>> in/java/org/apache/impala/planner/
>>>>>>> I hope you can help me with the scan node <-> scan range->host
>>>>>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on
the same
>>>>>>> node (which I want to join locally), I don't get the point of
why scan
>>>>>>> ranges could be assigned to another host in my scenario.
>>>>>>> Best regads and thank you very much!
>>>>>>> Philipp Krause
>>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>>>>>> Thanks for following up. I think I understand your setup.
>>>>>>> If you want to not think about scan ranges, then you can modify
>>>>>>> HdfsScanNode.computeScanRangeLocations(). For example, you could
>>>>>>> change it to produce one scan range per file or per HDFS block.
That way
>>>>>>> you'd know exactly what a scan range corresponds to.
>>>>>>> I think the easiest/fastest way for you to make progress is to
>>>>>>> re-implement the existing scan range assignment logic in that
place in the
>>>>>>> code I had pointed you to. There is no quick fix to change the
>>>>>>> behavior.
>>>>>>> The existing scan range assignment is scan-node centric. For
>>>>>>> scan node, we independently decide which of its scan ranges should
>>>>>>> processed by which host.
>>>>>>> I believe an algorithm to achieve your goal would look completely
>>>>>>> different. You want it to be host-centric. For each host, collect
the local
>>>>>>> scan ranges of *all* scan nodes, and assign them to that host.
>>>>>>> Does that make sense?
>>>>>>> Alex
>>>>>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <
>>>>>>>> wrote:
>>>>>>>> I'd like to provide a small example for our purpose. The
last post
>>>>>>>> may be a bit confusing, so here's a very simple example in
the attached pdf
>>>>>>>> file. I hope, it's understandable. Otherwise, please give
me a short
>>>>>>>> feedback.
>>>>>>>> Basically, I only want each data node to join all it's local
>>>>>>>> blocks. Is there a range mapping needed or is it possible
to easily join
>>>>>>>> all local blocks (regardless of its content) since everything
is already
>>>>>>>> "prepared"? Maybe you can clarify this for me.
>>>>>>>> As you can see in the example, the tables are not partitioned
>>>>>>>> ID. The files are manually prepared by the help of the modulo
function. So
>>>>>>>> I don't have a range like [0,10], but something like 0,5,10,15
>>>>>>>> I hope, I didn't make it too complicated and confusing. I
>>>>>>>> the actual idea behind this is really simple and I hope you
can help me to
>>>>>>>> get this working.
>>>>>>>> Best regards and thank you very much for your time!
>>>>>>>> Philipp
>>>>>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>>>>>> Hi! At the moment the data to parquet (block) mapping is
based on a
>>>>>>>> simple modulo function: Id % #data_nodes. So with 5 data
nodes all rows
>>>>>>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9
are written to
>>>>>>>> Parquet_1 etc. That's what I did manually. Since the parquet
file size and
>>>>>>>> the block size are both set to 64MB, each parquet file will
result in one
>>>>>>>> block when I transfer the parquet files to HDFS. By default,
>>>>>>>> distributes the blocks randomly. For test purposes I transferred
>>>>>>>> corresponding blocks from Table_A and Table_B to the same
data node
>>>>>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y
with Id's
>>>>>>>> 0,5,10). In this case, they are transferred to data_node_0
because the
>>>>>>>> modulo function (which I want to implement in the scheduler)
returns 0 for
>>>>>>>> these Id's. This is also done manually at the moment.
>>>>>>>> 1.) DistributedPlanner: For first, upcoming tests I simply
>>>>>>>> the first condition in the DistributedPlanner to true to
avoid exchange
>>>>>>>> nodes.
>>>>>>>> 2.) The scheduler: That's the part I'm currently struggling
>>>>>>>> For first tests, block replication is deactivated. I'm not
sure how / where
>>>>>>>> to implement the modulo function for scan range to host mapping.
>>>>>>>> the modulo function, I had to implement a hard coded mapping
>>>>>>>> like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is
that correct? Instead
>>>>>>>> I would like to use a slightly more flexible solution by
the help of this
>>>>>>>> modulo function for the host mapping.
>>>>>>>> I would be really grateful if you could give me a hint for
>>>>>>>> scheduling implementation. I try to go deeper through the
code meanwhile.
>>>>>>>> Best regards and thank you in advance
>>>>>>>> Philipp
>>>>>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>>>>>> Thank you very much for these information! I'll try to implement
>>>>>>>> these two steps and post some updates within the next days!
>>>>>>>> Best regards
>>>>>>>> Philipp
>>>>>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <>:
>>>>>>>>> Cool that you working on a research project with Impala!
>>>>>>>>> Properly adding such a feature to Impala is a substantial
>>>>>>>>> but hacking the code for an experiment or two seems doable.
>>>>>>>>> I think you will need to modify two things: (1) the planner
to not
>>>>>>>>> add exchange nodes, and (2) the scheduler to assign the
co-located scan
>>>>>>>>> ranges to the same host.
>>>>>>>>> Here are a few starting points in the code:
>>>>>>>>> 1) DistributedPlanner
>>>>>>>>> java/org/apache/impala/planner/
>>>>>>>>> The first condition handles the case where no exchange
nodes need
>>>>>>>>> to be added because the join inputs are already suitably
>>>>>>>>> You could hack the code to always go into that codepath,
so no
>>>>>>>>> exchanges are added.
>>>>>>>>> 2) The scheduler
>>>>>>>>> ng/
>>>>>>>>> You'll need to dig through and understand that code so
that you
>>>>>>>>> can make the necessary changes. Change the scan range
to host mapping to
>>>>>>>>> your liking. The rest of the code should just work.
>>>>>>>>> Cheers,
>>>>>>>>> Alex
>>>>>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>>>>>>>> wrote:
>>>>>>>>>> Thank you very much for your quick answers!
>>>>>>>>>> The intention behind this is to improve the execution
time and
>>>>>>>>>> (primarily) to examine the impact of block-co-location
(research project)
>>>>>>>>>> for this particular query (simplified):
>>>>>>>>>> select A.x, B.y, A.z from tableA as A inner join
tableB as B on
>>>>>>>>>> The "real" query includes three joins and the data
size is in
>>>>>>>>>> pb-range. Therefore several nodes (5 in the test
environment with less
>>>>>>>>>> data) are used (without any load balancer).
>>>>>>>>>> Could you give me some hints what code changes are
required and
>>>>>>>>>> which files are affected? I don't know how to give
Impala the information
>>>>>>>>>> that it should only join the local data blocks on
each node and then pass
>>>>>>>>>> it to the "final" node which receives all intermediate
results. I hope you
>>>>>>>>>> can help me to get this working. That would be awesome!
>>>>>>>>>> Best regards
>>>>>>>>>> Philipp
>>>>>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>>>>>>>> I suppose one exception is if your data lives only
on a single
>>>>>>>>>> node. Then you can set num_nodes=1 and make sure
to send the query request
>>>>>>>>>> to the impalad running on the same data node as the
target data. Then you
>>>>>>>>>> should get a local join.
>>>>>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Such a specific block arrangement is very uncommon
for typical
>>>>>>>>>>> Impala setups, so we don't attempt to recognize
and optimize this narrow
>>>>>>>>>>> case. In particular, such an arrangement tends
to be short lived if you
>>>>>>>>>>> have the HDFS balancer turned on.
>>>>>>>>>>> Without making code changes, there is no way
today to remove the
>>>>>>>>>>> data exchanges and make sure that the scheduler
assigns scan splits to
>>>>>>>>>>> nodes in the desired way (co-located, but with
possible load imbalance).
>>>>>>>>>>> In what way is the current setup unacceptable
to you? Is this
>>>>>>>>>>> pre-mature optimization? If you have certain
>>>>>>>>>>> expectations/requirements for specific queries
we might be able to help you
>>>>>>>>>>> improve those. If you want to pursue this route,
please help us by posting
>>>>>>>>>>> complete query profiles.
>>>>>>>>>>> Alex
>>>>>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> Hello everyone!
>>>>>>>>>>>> In order to prevent network traffic, I'd
like to perform local
>>>>>>>>>>>> joins on each node instead of exchanging
the data and perform a join over
>>>>>>>>>>>> the complete data afterwards. My query is
basically a join over three three
>>>>>>>>>>>> tables on an ID attribute. The blocks are
perfectly distributed, so that
>>>>>>>>>>>> e.g. Table A - Block 0  and Table B - Block
0  are on the same node. These
>>>>>>>>>>>> blocks contain all data rows with an ID range
[0,1]. Table A - Block 1  and
>>>>>>>>>>>> Table B - Block 1 with an ID range [2,3]
are on another node etc. So I want
>>>>>>>>>>>> to perform a local join per node because
any data exchange would be
>>>>>>>>>>>> unneccessary (except for the last step when
the final node recevieves all
>>>>>>>>>>>> results of the other nodes). Is this possible?
>>>>>>>>>>>> At the moment the query plan includes multiple
data exchanges,
>>>>>>>>>>>> although the blocks are already perfectly
distributed (manually).
>>>>>>>>>>>> I would be grateful for any help!
>>>>>>>>>>>> Best regards
>>>>>>>>>>>> Philipp Krause

View raw message