From Philipp Krause <>
Subject Re: Local join instead of data exchange - co-located blocks
Date Sun, 18 Mar 2018 16:32:31 GMT
Hi! At the moment the data to parquet (block) mapping is based on a 
simple modulo function: Id % #data_nodes. So with 5 data nodes all rows 
with Id's 0,5,10,... are written to Parquet_0, Id's 1,4,9 are written to 
Parquet_1 etc. That's what I did manually. Since the parquet file size 
and the block size are both set to 64MB, each parquet file will result 
in one block when I transfer the parquet files to HDFS. By default, HDFS 
distributes the blocks randomly. For test purposes I transferred 
corresponding blocks from Table_A and Table_B to the same data node 
(Table_A - Block_X with Id's 0,5,10 and Table_B - Block_Y with Id's 
0,5,10). In this case, they are transferred to data_node_0 because the 
modulo function (which I want to implement in the scheduler) returns 0 
for these Id's. This is also done manually at the moment.

1.) DistributedPlanner: For first, upcoming tests I simply changed the 
first condition in the DistributedPlanner to true to avoid exchange nodes.

2.) The scheduler: That's the part I'm currently struggling with. For 
first tests, block replication is deactivated. I'm not sure how / where 
to implement the modulo function for scan range to host mapping. Without 
the modulo function, I had to implement a hard coded mapping (something 
like "range" 0-0, 5-5, 10-10 -> Data_node_0 etc.). Is that correct? 
Instead I would like to use a slightly more flexible solution by the 
help of this modulo function for the host mapping.

I would be really grateful if you could give me a hint for the 
scheduling implementation. I try to go deeper through the code meanwhile.

Best regards and thank you in advance

Am 14.03.2018 um 08:06 schrieb Philipp Krause:
> Thank you very much for these information! I'll try to implement these 
> two steps and post some updates within the next days!
> Best regards
> Philipp
> 2018-03-13 5:38 GMT+01:00 Alexander Behm < 
> <>>:
>     Cool that you working on a research project with Impala!
>     Properly adding such a feature to Impala is a substantial effort,
>     but hacking the code for an experiment or two seems doable.
>     I think you will need to modify two things: (1) the planner to not
>     add exchange nodes, and (2) the scheduler to assign the co-located
>     scan ranges to the same host.
>     Here are a few starting points in the code:
>     1) DistributedPlanner
>     <>
>     The first condition handles the case where no exchange nodes need
>     to be added because the join inputs are already suitably partitioned.
>     You could hack the code to always go into that codepath, so no
>     exchanges are added.
>     2) The scheduler
>     <>
>     You'll need to dig through and understand that code so that you
>     can make the necessary changes. Change the scan range to host
>     mapping to your liking. The rest of the code should just work.
>     Cheers,
>     Alex
>     On Mon, Mar 12, 2018 at 6:55 PM, Philipp Krause
>     <
>     <>> wrote:
>         Thank you very much for your quick answers!
>         The intention behind this is to improve the execution time and
>         (primarily) to examine the impact of block-co-location
>         (research project) for this particular query (simplified):
>         select A.x, B.y, A.z from tableA as A inner join tableB as B
>         on
>         The "real" query includes three joins and the data size is in
>         pb-range. Therefore several nodes (5 in the test environment
>         with less data) are used (without any load balancer).
>         Could you give me some hints what code changes are required
>         and which files are affected? I don't know how to give Impala
>         the information that it should only join the local data blocks
>         on each node and then pass it to the "final" node which
>         receives all intermediate results. I hope you can help me to
>         get this working. That would be awesome!
>         Best regards
>         Philipp
>         Am 12.03.2018 um 18:38 schrieb Alexander Behm:
>>         I suppose one exception is if your data lives only on a
>>         single node. Then you can set num_nodes=1 and make sure to
>>         send the query request to the impalad running on the same
>>         data node as the target data. Then you should get a local join.
>>         On Mon, Mar 12, 2018 at 9:30 AM, Alexander Behm
>>         < <>> wrote:
>>             Such a specific block arrangement is very uncommon for
>>             typical Impala setups, so we don't attempt to recognize
>>             and optimize this narrow case. In particular, such an
>>             arrangement tends to be short lived if you have the HDFS
>>             balancer turned on.
>>             Without making code changes, there is no way today to
>>             remove the data exchanges and make sure that the
>>             scheduler assigns scan splits to nodes in the desired way
>>             (co-located, but with possible load imbalance).
>>             In what way is the current setup unacceptable to you? Is
>>             this pre-mature optimization? If you have certain
>>             performance expectations/requirements for specific
>>             queries we might be able to help you improve those. If
>>             you want to pursue this route, please help us by posting
>>             complete query profiles.
>>             Alex
>>             On Mon, Mar 12, 2018 at 6:29 AM, Philipp Krause
>>             <
>>             <>> wrote:
>>                 Hello everyone!
>>                 In order to prevent network traffic, I'd like to
>>                 perform local joins on each node instead of
>>                 exchanging the data and perform a join over the
>>                 complete data afterwards. My query is basically a
>>                 join over three three tables on an ID attribute. The
>>                 blocks are perfectly distributed, so that e.g. Table
>>                 A - Block 0  and Table B - Block 0  are on the same
>>                 node. These blocks contain all data rows with an ID
>>                 range [0,1]. Table A - Block 1 and Table B - Block 1
>>                 with an ID range [2,3] are on another node etc. So I
>>                 want to perform a local join per node because any
>>                 data exchange would be unneccessary (except for the
>>                 last step when the final node recevieves all results
>>                 of the other nodes). Is this possible?
>>                 At the moment the query plan includes multiple data
>>                 exchanges, although the blocks are already perfectly
>>                 distributed (manually).
>>                 I would be grateful for any help!
>>                 Best regards
>>                 Philipp Krause

