impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Behm <>
Subject Re: Local join instead of data exchange - co-located blocks
Date Fri, 13 Apr 2018 18:12:45 GMT
Here's the foll list. It might not be minimal, but copying/overwriting
these should work.


If you are only modifying the Java portion (like DistributedPlanner), then
only copying/replacing the *.jar files should be sufficient.

On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <> wrote:

> Yes, I have a running (virtual) cluster. I would try to follow your way
> with the custom impala build ( is the only modified
> file at the moment). Thank you in advance for the file list!
> Best regards
> Philipp
> Alexander Behm <> schrieb am Fr., 13. Apr. 2018,
> 18:45:
>> I'm not really following your installation/setup and am not an expert on
>> Cloudera Manager installation/config. If you are going to build Impala
>> anyway, it's probably easiest to test on Impala's minicluster first.
>> In general, if you have a running Cloudera Managed cluster, you can
>> deploy a custom Impala build by simply overwriting the Impala existing
>> binaries and jars with the new build. If you want to go this route, I can
>> give you a full list of files you need to replace.
>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause <philippkrause.mail@
>>> wrote:
>>> Thank you for the explanation! Yes, I'm using HDFS. The single replica
>>> setup is only for test purposes at the moment. I think this makes it easier
>>> to gain some first results since less modifications (scheduler etc.) are
>>> neccessary.
>>> I would like to test the DistributedPlanner modification in my virtual
>>> cluster. I used a customized Vagrant script to install Impala on multiple
>>> hosts (s.attachment). It simply installs cloudera-manager-server-db,
>>> cloudera-manager-server and cloudera-manager-daemons via apt-get. What
>>> would be the simplest solution to setup my modified version? Could I simply
>>> call ./ and change the script to sth. like this?
>>> echo "Install java..."
>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>> echo "Download impala..."
>>> wget https://... where I uploaded my modified version
>>> echo "Extract impala..."
>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>> cd Impala-cdh5-trunk
>>> echo "Build impala..."
>>> ./
>>> echo "Start impala instances..."
>>> service cloudera-scm-server-db initdb
>>> service cloudera-scm-server-db start
>>> service cloudera-scm-server start
>>> Or is there another, maybe even easier method, to test the code? Maybe
>>> via / minicluster?
>>> Best regards
>>> Philipp
>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <>:
>>>> Apologies for the late response. Btw, your previous post was clear
>>>> enough to me, so no worries :)
>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause <philippkrause.mail@
>>>>> wrote:
>>>>> Hello Alex,
>>>>> I think my previous post has been too long and confusing. I apologize
>>>>> for that!
>>>>> If replicas are completely deactivated, all scan ranges of a block are
>>>>> mapped to the one host, where the block is located on. This host is the
>>>>> "executor"/reader for all the scan ranges of this block. Is that correct?
>>>> Yes, assuming you are using HDFS.
>>>>> I tried to visualize my understanding of the scan_range to host
>>>>> mapping for my use case (s. attachment). Could you please have a quick
>>>>> at it and tell me if this is correct?
>>>>> "The existing scan range assignment is scan-node centric. For each
>>>>> scan node, we independently decide which of its scan ranges should be
>>>>> processed by which host."
>>>>> Without replicas, all scan ranges of a block would be assigend to the
>>>>> same host where this block is located on. Isn't everything local here,
>>>>> that Table_A - Block_0 and Table_B - Block_0 can be joined local or are
>>>>> further steps neccessary? The condition in the DistributedPlanner you
>>>>> pointed to me is set to false (no exchange nodes).
>>>>> "You want it to be host-centric. For each host, collect the local scan
>>>>> ranges of *all* scan nodes, and assign them to that host."
>>>>> Wouldn't the standard setup from above work? Wouldn't I assign all
>>>>> (the same) scan ranges to each host in this case here?
>>>> The standard setup works only in if every block only has exactly one
>>>> replica. For our purposes, that is basically never the case (who would
>>>> store production data without replication?), so the single-replica
>>>> assumption was not clear to me.
>>>> Does your current setup (only changing the planner and not the
>>>> scheduler) produce the expected results?
>>>>> Thank you very much!
>>>>> Best regards
>>>>> Philipp
>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause <philippkrause.mail@
>>>>>> Thank you for your answer and sorry for my delay!
>>>>>> If my understanding is correct, the list of scan nodes consists of
>>>>>> all nodes which contain a *local* block from a table that is needed
for the
>>>>>> query (Assumption: I have no replicas in my first tests). If TableA-Block0
>>>>>> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't
>>>>>> scan node always be the host for the complete scan range(s) then?
>>>>>> "For each scan node, we independently decide which of its scan ranges
>>>>>> should be processed by which host."
>>>>>> scheduling/
>>>>>> // Loop over all scan ranges, select an executor for those with local
>>>>>> impalads and
>>>>>> // collect all others for later processing.
>>>>>> So in this whole block, scan ranges are assigned to the closest
>>>>>> executor (=host?). But isn't the closest executor always the node
the block
>>>>>> is located on (assumed impalad is installed and I have no replicas)?
>>>>>> isn't this node always a scan node at the same time? Otherwise a
thread on
>>>>>> a remote host had to read the corresponding scan range, which would
be more
>>>>>> expensive. The only exception I can think of is when all threads
on the
>>>>>> local node are busy. Or, if I use replicas and all other threads
of my node
>>>>>> with the "original" block are busy, a thread on another node which
>>>>>> a replica could read a special scan range of its local block. Is
>>>>>> understanding correct here?
>>>>>> Aren't all scan ranges read locally by its scan nodes if I have
>>>>>> impalad installed on all nodes? And am I right, that the scan range
is only
>>>>>> based on its length which refers to maxScanRangeLength in
>>>>>> computeScanRangeLocations?
>>>>>> main/java/org/apache/impala/planner/
>>>>>> I hope you can help me with the scan node <-> scan range->host
>>>>>> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the
>>>>>> node (which I want to join locally), I don't get the point of why
>>>>>> ranges could be assigned to another host in my scenario.
>>>>>> Best regads and thank you very much!
>>>>>> Philipp Krause
>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
>>>>>> Thanks for following up. I think I understand your setup.
>>>>>> If you want to not think about scan ranges, then you can modify
>>>>>> HdfsScanNode.computeScanRangeLocations(). For example, you could
>>>>>> change it to produce one scan range per file or per HDFS block. That
>>>>>> you'd know exactly what a scan range corresponds to.
>>>>>> I think the easiest/fastest way for you to make progress is to
>>>>>> re-implement the existing scan range assignment logic in that place
in the
>>>>>> code I had pointed you to. There is no quick fix to change the existing
>>>>>> behavior.
>>>>>> The existing scan range assignment is scan-node centric. For each
>>>>>> scan node, we independently decide which of its scan ranges should
>>>>>> processed by which host.
>>>>>> I believe an algorithm to achieve your goal would look completely
>>>>>> different. You want it to be host-centric. For each host, collect
the local
>>>>>> scan ranges of *all* scan nodes, and assign them to that host.
>>>>>> Does that make sense?
>>>>>> Alex
>>>>>> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <philippkrause.mail@
>>>>>>> wrote:
>>>>>>> I'd like to provide a small example for our purpose. The last
>>>>>>> may be a bit confusing, so here's a very simple example in the
attached pdf
>>>>>>> file. I hope, it's understandable. Otherwise, please give me
a short
>>>>>>> feedback.
>>>>>>> Basically, I only want each data node to join all it's local
>>>>>>> Is there a range mapping needed or is it possible to easily join
all local
>>>>>>> blocks (regardless of its content) since everything is already
>>>>>>> Maybe you can clarify this for me.
>>>>>>> As you can see in the example, the tables are not partitioned
by ID.
>>>>>>> The files are manually prepared by the help of the modulo function.
So I
>>>>>>> don't have a range like [0,10], but something like 0,5,10,15
>>>>>>> I hope, I didn't make it too complicated and confusing. I think,
>>>>>>> actual idea behind this is really simple and I hope you can help
me to get
>>>>>>> this working.
>>>>>>> Best regards and thank you very much for your time!
>>>>>>> Philipp
>>>>>>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>>>>>>> Hi! At the moment the data to parquet (block) mapping is based
on a
>>>>>>> simple modulo function: Id % #data_nodes. So with 5 data nodes
all rows
>>>>>>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are
written to
>>>>>>> Parquet_1 etc. That's what I did manually. Since the parquet
file size and
>>>>>>> the block size are both set to 64MB, each parquet file will result
in one
>>>>>>> block when I transfer the parquet files to HDFS. By default,
>>>>>>> distributes the blocks randomly. For test purposes I transferred
>>>>>>> corresponding blocks from Table_A and Table_B to the same data
>>>>>>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with
>>>>>>> 0,5,10). In this case, they are transferred to data_node_0 because
>>>>>>> modulo function (which I want to implement in the scheduler)
returns 0 for
>>>>>>> these Id's. This is also done manually at the moment.
>>>>>>> 1.) DistributedPlanner: For first, upcoming tests I simply changed
>>>>>>> the first condition in the DistributedPlanner to true to avoid
>>>>>>> nodes.
>>>>>>> 2.) The scheduler: That's the part I'm currently struggling with.
>>>>>>> For first tests, block replication is deactivated. I'm not sure
how / where
>>>>>>> to implement the modulo function for scan range to host mapping.
>>>>>>> the modulo function, I had to implement a hard coded mapping
>>>>>>> like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that
correct? Instead
>>>>>>> I would like to use a slightly more flexible solution by the
help of this
>>>>>>> modulo function for the host mapping.
>>>>>>> I would be really grateful if you could give me a hint for the
>>>>>>> scheduling implementation. I try to go deeper through the code
>>>>>>> Best regards and thank you in advance
>>>>>>> Philipp
>>>>>>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>>>>>>> Thank you very much for these information! I'll try to implement
>>>>>>> these two steps and post some updates within the next days!
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <>:
>>>>>>>> Cool that you working on a research project with Impala!
>>>>>>>> Properly adding such a feature to Impala is a substantial
>>>>>>>> but hacking the code for an experiment or two seems doable.
>>>>>>>> I think you will need to modify two things: (1) the planner
to not
>>>>>>>> add exchange nodes, and (2) the scheduler to assign the co-located
>>>>>>>> ranges to the same host.
>>>>>>>> Here are a few starting points in the code:
>>>>>>>> 1) DistributedPlanner
>>>>>>>> main/java/org/apache/impala/planner/
>>>>>>>> The first condition handles the case where no exchange nodes
>>>>>>>> to be added because the join inputs are already suitably
>>>>>>>> You could hack the code to always go into that codepath,
so no
>>>>>>>> exchanges are added.
>>>>>>>> 2) The scheduler
>>>>>>>> scheduling/
>>>>>>>> You'll need to dig through and understand that code so that
you can
>>>>>>>> make the necessary changes. Change the scan range to host
mapping to your
>>>>>>>> liking. The rest of the code should just work.
>>>>>>>> Cheers,
>>>>>>>> Alex
>>>>>>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>>>>>>> wrote:
>>>>>>>>> Thank you very much for your quick answers!
>>>>>>>>> The intention behind this is to improve the execution
time and
>>>>>>>>> (primarily) to examine the impact of block-co-location
(research project)
>>>>>>>>> for this particular query (simplified):
>>>>>>>>> select A.x, B.y, A.z from tableA as A inner join tableB
as B on
>>>>>>>>> The "real" query includes three joins and the data size
is in
>>>>>>>>> pb-range. Therefore several nodes (5 in the test environment
with less
>>>>>>>>> data) are used (without any load balancer).
>>>>>>>>> Could you give me some hints what code changes are required
>>>>>>>>> which files are affected? I don't know how to give Impala
the information
>>>>>>>>> that it should only join the local data blocks on each
node and then pass
>>>>>>>>> it to the "final" node which receives all intermediate
results. I hope you
>>>>>>>>> can help me to get this working. That would be awesome!
>>>>>>>>> Best regards
>>>>>>>>> Philipp
>>>>>>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>>>>>>> I suppose one exception is if your data lives only on
a single
>>>>>>>>> node. Then you can set num_nodes=1 and make sure to send
the query request
>>>>>>>>> to the impalad running on the same data node as the target
data. Then you
>>>>>>>>> should get a local join.
>>>>>>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <
>>>>>>>>>> wrote:
>>>>>>>>>> Such a specific block arrangement is very uncommon
for typical
>>>>>>>>>> Impala setups, so we don't attempt to recognize and
optimize this narrow
>>>>>>>>>> case. In particular, such an arrangement tends to
be short lived if you
>>>>>>>>>> have the HDFS balancer turned on.
>>>>>>>>>> Without making code changes, there is no way today
to remove the
>>>>>>>>>> data exchanges and make sure that the scheduler assigns
scan splits to
>>>>>>>>>> nodes in the desired way (co-located, but with possible
load imbalance).
>>>>>>>>>> In what way is the current setup unacceptable to
you? Is this
>>>>>>>>>> pre-mature optimization? If you have certain performance
>>>>>>>>>> expectations/requirements for specific queries we
might be able to help you
>>>>>>>>>> improve those. If you want to pursue this route,
please help us by posting
>>>>>>>>>> complete query profiles.
>>>>>>>>>> Alex
>>>>>>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Hello everyone!
>>>>>>>>>>> In order to prevent network traffic, I'd like
to perform local
>>>>>>>>>>> joins on each node instead of exchanging the
data and perform a join over
>>>>>>>>>>> the complete data afterwards. My query is basically
a join over three three
>>>>>>>>>>> tables on an ID attribute. The blocks are perfectly
distributed, so that
>>>>>>>>>>> e.g. Table A - Block 0  and Table B - Block 0
 are on the same node. These
>>>>>>>>>>> blocks contain all data rows with an ID range
[0,1]. Table A - Block 1  and
>>>>>>>>>>> Table B - Block 1 with an ID range [2,3] are
on another node etc. So I want
>>>>>>>>>>> to perform a local join per node because any
data exchange would be
>>>>>>>>>>> unneccessary (except for the last step when the
final node recevieves all
>>>>>>>>>>> results of the other nodes). Is this possible?
>>>>>>>>>>> At the moment the query plan includes multiple
data exchanges,
>>>>>>>>>>> although the blocks are already perfectly distributed
>>>>>>>>>>> I would be grateful for any help!
>>>>>>>>>>> Best regards
>>>>>>>>>>> Philipp Krause

View raw message