impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Philipp Krause <>
Subject Re: Local join instead of data exchange - co-located blocks
Date Wed, 04 Apr 2018 14:46:50 GMT
 Hello Alex,

I think my previous post has been too long and confusing. I apologize for

If replicas are completely deactivated, all scan ranges of a block are
mapped to the one host, where the block is located on. This host is the
"executor"/reader for all the scan ranges of this block. Is that correct?

I tried to visualize my understanding of the scan_range to host mapping for
my use case (s. attachment). Could you please have a quick look at it and
tell me if this is correct?

"The existing scan range assignment is scan-node centric. For each scan
node, we independently decide which of its scan ranges should be processed
by which host."
Without replicas, all scan ranges of a block would be assigend to the same
host where this block is located on. Isn't everything local here, so that
Table_A - Block_0 and Table_B - Block_0 can be joined local or are further
steps neccessary? The condition in the DistributedPlanner you pointed to me
is set to false (no exchange nodes).

"You want it to be host-centric. For each host, collect the local scan
ranges of *all* scan nodes, and assign them to that host."
Wouldn't the standard setup from above work? Wouldn't I assign all (the
same) scan ranges to each host in this case here?

Thank you very much!

Best regards

2018-03-28 21:04 GMT+02:00 Philipp Krause <

> Thank you for your answer and sorry for my delay!
> If my understanding is correct, the list of scan nodes consists of all
> nodes which contain a *local* block from a table that is needed for the
> query (Assumption: I have no replicas in my first tests). If TableA-Block0
> is on Node_0, isn't Node_0 automatically a scan node? And wouldn't this
> scan node always be the host for the complete scan range(s) then?
> "For each scan node, we independently decide which of its scan ranges
> should be processed by which host."
> scheduling/
> // Loop over all scan ranges, select an executor for those with local
> impalads and
> // collect all others for later processing.
> So in this whole block, scan ranges are assigned to the closest executor
> (=host?). But isn't the closest executor always the node the block is
> located on (assumed impalad is installed and I have no replicas)? And isn't
> this node always a scan node at the same time? Otherwise a thread on a
> remote host had to read the corresponding scan range, which would be more
> expensive. The only exception I can think of is when all threads on the
> local node are busy. Or, if I use replicas and all other threads of my node
> with the "original" block are busy, a thread on another node which contains
> a replica could read a special scan range of its local block. Is my
> understanding correct here?
> Aren't all scan ranges read locally by its scan nodes if I have impalad
> installed on all nodes? And am I right, that the scan range is only based
> on its length which refers to maxScanRangeLength in
> computeScanRangeLocations?
> main/java/org/apache/impala/planner/
> I hope you can help me with the scan node <-> scan range->host
> relationship. If I have Table_A-Block_0 and Table_B_Block_0 on the same
> node (which I want to join locally), I don't get the point of why scan
> ranges could be assigned to another host in my scenario.
> Best regads and thank you very much!
> Philipp Krause
> Am 21.03.2018 um 05:21 schrieb Alexander Behm:
> Thanks for following up. I think I understand your setup.
> If you want to not think about scan ranges, then you can modify
> HdfsScanNode.computeScanRangeLocations(). For example, you could change
> it to produce one scan range per file or per HDFS block. That way you'd
> know exactly what a scan range corresponds to.
> I think the easiest/fastest way for you to make progress is to
> re-implement the existing scan range assignment logic in that place in the
> code I had pointed you to. There is no quick fix to change the existing
> behavior.
> The existing scan range assignment is scan-node centric. For each scan
> node, we independently decide which of its scan ranges should be processed
> by which host.
> I believe an algorithm to achieve your goal would look completely
> different. You want it to be host-centric. For each host, collect the local
> scan ranges of *all* scan nodes, and assign them to that host.
> Does that make sense?
> Alex
> On Mon, Mar 19, 2018 at 1:02 PM, Philipp Krause <philippkrause.mail@
>> wrote:
>> I'd like to provide a small example for our purpose. The last post may be
>> a bit confusing, so here's a very simple example in the attached pdf file.
>> I hope, it's understandable. Otherwise, please give me a short feedback.
>> Basically, I only want each data node to join all it's local blocks. Is
>> there a range mapping needed or is it possible to easily join all local
>> blocks (regardless of its content) since everything is already "prepared"?
>> Maybe you can clarify this for me.
>> As you can see in the example, the tables are not partitioned by ID. The
>> files are manually prepared by the help of the modulo function. So I don't
>> have a range like [0,10], but something like 0,5,10,15 etc.
>> I hope, I didn't make it too complicated and confusing. I think, the
>> actual idea behind this is really simple and I hope you can help me to get
>> this working.
>> Best regards and thank you very much for your time!
>> Philipp
>> Am 18.03.2018 um 17:32 schrieb Philipp Krause:
>> Hi! At the moment the data to parquet (block) mapping is based on a
>> simple modulo function: Id % #data_nodes. So with 5 data nodes all rows
>> with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to
>> Parquet_1 etc. That's what I did manually. Since the parquet file size and
>> the block size are both set to 64MB, each parquet file will result in one
>> block when I transfer the parquet files to HDFS. By default, HDFS
>> distributes the blocks randomly. For test purposes I transferred
>> corresponding blocks from Table_A and Table_B to the same data node
>> (Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's
>> 0,5,10). In this case, they are transferred to data_node_0 because the
>> modulo function (which I want to implement in the scheduler) returns 0 for
>> these Id's. This is also done manually at the moment.
>> 1.) DistributedPlanner: For first, upcoming tests I simply changed the
>> first condition in the DistributedPlanner to true to avoid exchange nodes.
>> 2.) The scheduler: That's the part I'm currently struggling with. For
>> first tests, block replication is deactivated. I'm not sure how / where to
>> implement the modulo function for scan range to host mapping. Without the
>> modulo function, I had to implement a hard coded mapping (something like
>> "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? Instead I
>> would like to use a slightly more flexible solution by the help of this
>> modulo function for the host mapping.
>> I would be really grateful if you could give me a hint for the scheduling
>> implementation. I try to go deeper through the code meanwhile.
>> Best regards and thank you in advance
>> Philipp
>> Am 14.03.2018 um 08:06 schrieb Philipp Krause:
>> Thank you very much for these information! I'll try to implement these
>> two steps and post some updates within the next days!
>> Best regards
>> Philipp
>> 2018-03-13 5:38 GMT+01:00 Alexander Behm <>:
>>> Cool that you working on a research project with Impala!
>>> Properly adding such a feature to Impala is a substantial effort, but
>>> hacking the code for an experiment or two seems doable.
>>> I think you will need to modify two things: (1) the planner to not add
>>> exchange nodes, and (2) the scheduler to assign the co-located scan ranges
>>> to the same host.
>>> Here are a few starting points in the code:
>>> 1) DistributedPlanner
>>> a/org/apache/impala/planner/
>>> The first condition handles the case where no exchange nodes need to be
>>> added because the join inputs are already suitably partitioned.
>>> You could hack the code to always go into that codepath, so no exchanges
>>> are added.
>>> 2) The scheduler
>>> ng/
>>> You'll need to dig through and understand that code so that you can make
>>> the necessary changes. Change the scan range to host mapping to your
>>> liking. The rest of the code should just work.
>>> Cheers,
>>> Alex
>>> On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause <
>>>> wrote:
>>>> Thank you very much for your quick answers!
>>>> The intention behind this is to improve the execution time and
>>>> (primarily) to examine the impact of block-co-location (research project)
>>>> for this particular query (simplified):
>>>> select A.x, B.y, A.z from tableA as A inner join tableB as B on
>>>> The "real" query includes three joins and the data size is in pb-range.
>>>> Therefore several nodes (5 in the test environment with less data) are used
>>>> (without any load balancer).
>>>> Could you give me some hints what code changes are required and which
>>>> files are affected? I don't know how to give Impala the information that
>>>> should only join the local data blocks on each node and then pass it to the
>>>> "final" node which receives all intermediate results. I hope you can help
>>>> me to get this working. That would be awesome!
>>>> Best regards
>>>> Philipp
>>>> Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>>> I suppose one exception is if your data lives only on a single node.
>>>> Then you can set num_nodes=1 and make sure to send the query request to the
>>>> impalad running on the same data node as the target data. Then you should
>>>> get a local join.
>>>> On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm <
>>>> > wrote:
>>>>> Such a specific block arrangement is very uncommon for typical Impala
>>>>> setups, so we don't attempt to recognize and optimize this narrow case.
>>>>> particular, such an arrangement tends to be short lived if you have the
>>>>> HDFS balancer turned on.
>>>>> Without making code changes, there is no way today to remove the data
>>>>> exchanges and make sure that the scheduler assigns scan splits to nodes
>>>>> the desired way (co-located, but with possible load imbalance).
>>>>> In what way is the current setup unacceptable to you? Is this
>>>>> pre-mature optimization? If you have certain performance
>>>>> expectations/requirements for specific queries we might be able to help
>>>>> improve those. If you want to pursue this route, please help us by posting
>>>>> complete query profiles.
>>>>> Alex
>>>>> On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause <
>>>>>> wrote:
>>>>>> Hello everyone!
>>>>>> In order to prevent network traffic, I'd like to perform local joins
>>>>>> on each node instead of exchanging the data and perform a join over
>>>>>> complete data afterwards. My query is basically a join over three
>>>>>> tables on an ID attribute. The blocks are perfectly distributed,
so that
>>>>>> e.g. Table A - Block 0  and Table B - Block 0  are on the same node.
>>>>>> blocks contain all data rows with an ID range [0,1]. Table A - Block
1  and
>>>>>> Table B - Block 1 with an ID range [2,3] are on another node etc.
So I want
>>>>>> to perform a local join per node because any data exchange would
>>>>>> unneccessary (except for the last step when the final node recevieves
>>>>>> results of the other nodes). Is this possible?
>>>>>> At the moment the query plan includes multiple data exchanges,
>>>>>> although the blocks are already perfectly distributed (manually).
>>>>>> I would be grateful for any help!
>>>>>> Best regards
>>>>>> Philipp Krause

View raw message