impala-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lars Volker>
Subject Re: Local join instead of data exchange - co-located blocks
Date Thu, 17 May 2018 21:32:22 GMT
Hi Philipp,

My idea was to debug the query startup, both on the coordinator and on the
executors. These are the same process in your case but would still go
through an RPC through ExecQueryFInstances. You'll want to find where the
rows get lost between the scan nodes and the join. Can you attach profile
and logs for the same query with num_nodes=0?

Cheers, Lars

On Wed, May 16, 2018 at 6:52 AM, Philipp Krause <> wrote:

> Hi Lars,
> thank you very much for your quick response! I disabled runtime filters,
> so that one of the scan nodes passes its rows to the hash join fragment
> now. If I additionaly set num_nodes=1 the second scan node also receives
> and passes its data to the join fragment and the query works fine (split
> stats are not empty - the query profile is attached). I rechecked my scan
> assignments and they seem to be fine. I also created two other partitioned
> tables with only a few rows for testing. Here, all blocks/partitions are on
> the same node (so no manual block movement was neccessary). Unfortunately
> the result was the same: One of the scan nodes remains empty except if I
> set num_nodes=1.
> I'm not really sure about what exactly to debug in the lines you mentioned
> and what to look for. Should I try to print the schedule object itself via
> schedule_.get()? Maybe you have a presumtion of what the problem might be?
> I try to proceed with debugging in the meantime. If you need some other
> logs or something else, please let me know. I'm really eager to get this
> working.
> Best regards and thank you very much for your help,
> Philipp
> Am 14.05.2018 um 18:39 schrieb Lars Volker:
> Hi Philipp,
> Looking at the profile, one of your scan nodes doesn't seem to receive any
> scan ranges ("Hdfs split stats" is empty). The other one receives one
> split, but it get's filtered out by the runtime filter coming from that
> first node ("Files rejected: 1"). You might want to disable runtime filters
> for now until you get it sorted out.
> Then you might want to start debugging in be/src/service/,
> which is where the scheduler gets called. You mentioned that your
> assignments look OK, so until then things should be correct. If you're
> uncomfortable poking it all apart with GDB you can always print objects
> using the methods in debug-util.h. From there go down coord_->Exec() in
> L480. Set query option num_nodes=1 to execute everything at the coordinator
> for easier debugging. Otherwise, the coordinator will start remote
> fragments, which you can intercept with a debugger in
> ImpalaInternalService::ExecQueryFInstances (be/src/service/impala-
> Cheers, Lars
> On Mon, May 14, 2018 at 1:18 AM, Philipp Krause <philippkrause.mail@
>> wrote:
>> Hello Alex,
>> I suppose you're very busy, so I apologize for the interruption. If you
>> have any idea of what I could try to solve this problem, please let me
>> know. Currently I don't know how to progress and I'd appreciate any help
>> you can give me.
>> Best regards
>> Philipp
>> Philipp Krause <> schrieb am Mo., 7.
>> Mai 2018, 12:59:
>>> I just wanted to add, that I tried the join with two other, minimal and
>>> "fresh" tables. All blocks from both tables were on the same node but I got
>>> the same result that no data were processed. To me, the scan range mapping
>>> of my modified version looks the same compared to the original one. I only
>>> noticed a difference in the query profile:
>>> Filter 0 (1.00 MB):
>>>              - Files processed: 1 (1)
>>>              - Files rejected: 1 (1)
>>> ...
>>> This filter only appears in my modified version. Hopefully we can find
>>> the mistake.
>>> Am 04.05.2018 um 15:40 schrieb Philipp Krause:
>>> Hi!
>>> The query profile and the scan range mappings are attached
>>> (query_profile.txt + scan_ranges.txt). The complete log file is also
>>> attached. The mapping looks fine to me, I couldn't find any mistakes there.
>>> For example, line 168 (scan_ranges.txt) shows that partition ID=4 is
>>> assigned to node_0 and partition ID=10 is assigned to node_1. Both
>>> partitions contain all id=4 rows which should be correct for the join. But
>>> probably I have overlooked something in the log.
>>> The partition/block setup is as follows:
>>> 6 Nodes (1 Namenode, 5 Datanodes)
>>> Node 1:
>>> Node 2: 0|0 5|5
>>> Node 3: 1|1
>>> Node 4: 2|2
>>> Node 5: 3|3
>>> Node 6: 4|4
>>> 0|0 means partition_0 from table A and B.
>>> Also thanks to Lars for the logging option, which I have used!
>>> Best regards
>>> Philipp
>>> Am 04.05.2018 um 07:10 schrieb Lars Volker:
>>> I haven't followed this thread closely, but you can also print all scan
>>> range assignments made by the scheduler by passing -vmodule=scheduler=2 as
>>> a startup option. The logging happens in
>>> <>
>>> This wiki page has a way to achieve that using environment variables:
>>> A/Useful+Tips+for+New+Impala+Developers
>>> Cheers, Lars
>>> On Thu, May 3, 2018 at 8:54 PM, Alexander Behm <>
>>> wrote:
>>>> No idea what's going on, but my guess is something is awry with the
>>>> scan-range assignment. Can you attach the full profile? It's probably also
>>>> good to print the scan ranges created in HdfsScanNode.computeScanRangeL
>>>> ocations().
>>>> On Thu, May 3, 2018 at 5:51 PM, Philipp Krause <
>>>>> wrote:
>>>>> Hello Alex,
>>>>> I have tried out several configurations but I still couldn't find a
>>>>> solution for my problem :( In the query summary (s. attachment) it looks
>>>>> like as if no rows are read. Do you have an idea what I have to change?
>>>>> am sorry for the circumstances and thank you once more for the great
>>>>> support to get this working!
>>>>> Am 29.04.2018 um 21:21 schrieb Philipp Krause:
>>>>> Hi Alex,
>>>>> I got the modified version working on my cluster. The query plan looks
>>>>> exactly as wanted (s. attachment). This is awesome! Unfortunately the
>>>>> result set is empty. As you can see in query_state.png, the scan progress
>>>>> always shows 50% although the query has finished.
>>>>> The only modification in the code is the if statement you pointed to
>>>>> me (I set it to true). Maybe I have to give Impala the information about
>>>>> the lhs / rhs join partition since there are no exchange nodes now (like
>>>>> the following lines)? The corresponding  partitions / blocks of each
>>>>> are on the same node.
>>>>> I think we are very close to the final result and I hope you can help
>>>>> me once more. Thank you so much!
>>>>> Best regards
>>>>> Philipp
>>>>> Am 24.04.2018 um 18:00 schrieb Alexander Behm:
>>>>> On Tue, Apr 24, 2018 at 5:31 AM, Philipp Krause <
>>>>>> wrote:
>>>>>> To prevent the broadcast join I could simply use the shuffle operator
>>>>>> in the query:
>>>>>> SELECT * FROM business_partition_1 INNER JOIN [SHUFFLE]
>>>>>> business_partition_2 WHERE business_partition_1.businessi
>>>>>> d=business_partition_2.businessid
>>>>> Not sure what version of Impala you are using, and whether hints
>>>>> override any changes you might make. I suggest you make the code work
>>>>> you wish without requiring hints.
>>>>>> I think the broadcast is currently only used because of my very small
>>>>>> test tables.
>>>>>> This gives me the plan attached as partitioned_shuffle.png. Since
>>>>>> modified version isn't working yet, I partitioned both tables on
>>>>>> in Impala. The "hack" should only help to get into the if-condition
if I
>>>>>> partition the data manually, right?. But in this case (if the partitioning
>>>>>> is done by Impala itself) Impala should get into the if-condition
>>>>>> Unfortunately I can't see a difference in the plan compared to my
>>>>>> unpartitioned tables (unpartitioned.png) concerning the exchange
nodes. My
>>>>>> goal is actually to get rid of all exchange nodes since the corresponding
>>>>>> data is already present on each node. Actually the plan should look
>>>>>> same except for the 03 and 04 exchange then.
>>>>> I understand the picture and goal. I suggest you read, understand, and
>>>>> modify the code in DistributedPlanner.createHashJoinFragment() to
>>>>> create the plan shape that you want.
>>>>> I don't know how you are producing these plans. Are you sure your code
>>>>> changes are taking effect?
>>>>>> Are there other changes neccessary to just take
>>>>>> Table1-partition/block X and Table2-partition/block Y on each node
and join
>>>>>> them without any data exchange? Actually each node should take all
>>>>>> local blocks for both tables, join them and pass the results back
to the
>>>>>> coordinator where all results come together (please see explanation.png).
>>>>> No. You just need to create the plan without exchange nodes, as we've
>>>>> already gone through early in this thread.
>>>>>> I'm looking forward hearing from you. I hope we can realise this.
>>>>>> Best regards
>>>>>> Philipp
>>>>>> Am 24.04.2018 um 07:03 schrieb Alexander Behm:
>>>>>> On Mon, Apr 23, 2018 at 7:24 PM, Philipp Krause <
>>>>>>> wrote:
>>>>>>> Hi Alex,
>>>>>>> thanks for the information! I've compiled the cdh5.13.1-release
>>>>>>> replaced impala-frontend-0.1-SNAPSHOT.jar (which seems to include
>>>>>>> the changes in the There still seems
to to be a
>>>>>>> method missing after replacing the jar but I'll try to figure
that out.
>>>>>>> I have two questions concerning the code fragment in the
>>>>>>> you pointed to me.
>>>>>>> First:
>>>>>>> The attached graphic shows the query plan for two tables which
>>>>>>> partitioned on the join attribute (standard impala version without
>>>>>>> changes). If I change the if-condition to true for my modified
version I
>>>>>>> expect to get the same result for my "hand made" partitions (outside
>>>>>>> impala). But why is there an exchange broadcast (even in the
>>>>>>> version)? I mean, if I have a partition of Table 1 with ID=0
on Node X why
>>>>>>> is there a broadcast of the partition of Table 2 with ID=0 on
Node Y?
>>>>>>> Actually this partition only has to be sent to Node X where the
>>>>>>> partition of Table 1 is located (instead of sending it to all
>>>>>>> (broadcast)). Or is this exactly the case and it's only shown
as a
>>>>>>> broadcast here in the graphics?
>>>>>> You are reading the plan correctly. Impala simply does not implement
>>>>>> that optimization.
>>>>>>> Second:
>>>>>>> All corresponding blocks of the partitioned tables are on the
>>>>>>> node (e.g. Table 1 Partition with ID=0, Table 2 Partition with
ID=0 => Node
>>>>>>> 1 etc.). This is what I did manually. As already mentioned before
I want to
>>>>>>> join these partitions (blocks) locally on each node. But if I'm
>>>>>>> the modification in the DistributedPlanner will also only lead
to the plan
>>>>>>> in the attached graphic so that no further exchanges are created
if I use
>>>>>>> my "hand made" partitions. But there is still the broadcast exchange
>>>>>>> distributes the partitions across the cluster which isn't neccessary
>>>>>>> because all needed blocks are already on the same node and are
ready to get
>>>>>>> joined locally. Is there a way to realise that and get rid of
the broadcast
>>>>>>> exchange?
>>>>>>> You are right. That hack in DistributedPlanner only works for
>>>>>> partitioned hash joins. You can probably stitch the plan together
at an
>>>>>> earlier place, e.g.:
>>>>>> java/org/apache/impala/planner/
>>>>>>> Please correct me if I'm wrong with my assumptions.
>>>>>>> Thank you very much!
>>>>>>> Best regards
>>>>>>> Philipp
>>>>>>> Am 18.04.2018 um 07:16 schrieb Alexander Behm:
>>>>>>> Your CM-managed cluster must be running a "compatible" Impala
>>>>>>> version already for this trick to work. It looks like your catalog
>>>>>>> is trying to find a method which does not exist in the .jar,
>>>>>>> because your .jar is built based on a different version of Impala
>>>>>>> that method does not exist anymore.
>>>>>>> It looks like you have CDH 5.13.3 installed. CDH 5.13.3 is based
>>>>>>> Impala 2.10, see:
>>>>>>> notes/topics/cdh_vd_cdh_package_tarball_513.html#cm_
>>>>>>> vd_cdh_package_tarball_513
>>>>>>> That means this binary copying trick will only work with a modified
>>>>>>> version of Impala 2.10, and very likely will not work with a
>>>>>>> version.
>>>>>>> It's probably easier to test with the mini cluster first.
>>>>>>> Alternatively, it "might" work if you replace all the binaries
>>>>>>> above, but it's quite possible that will not work.
>>>>>>> On Sun, Apr 15, 2018 at 7:13 PM, Philipp Krause <
>>>>>>>> wrote:
>>>>>>>> Hi Alex! Thank you for the list! The build of the modified
>>>>>>>> cdh5-trunk branch (debug mode) was sucessfull. After replacing
>>>>>>>> "impala-frontend-0.1-SNAPSHOT.jar" in
>>>>>>>> /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/ I
got the
>>>>>>>> following error in my existing cluster:
>>>>>>>> F0416 01:16:45.402997 17897] NoSuchMethodError:
>>>>>>>> getCatalogObjects
>>>>>>>> When I switch back to the original jar file the error is
gone. So
>>>>>>>> it must be something wrong with this file I guess. But I
wonder about the
>>>>>>>> error in because I didn't touch any .cc files.
>>>>>>>> I also replaced "impala-data-source-api-1.0-SNAPSHOT.jar".
>>>>>>>> other jar files do not exist in my impala installation (CDH-5.13.1).
>>>>>>>> What am I doing wrong?
>>>>>>>> Best regards
>>>>>>>> Philipp
>>>>>>>> Am 13.04.2018 um 20:12 schrieb Alexander Behm:
>>>>>>>> Here's the foll list. It might not be minimal, but
>>>>>>>> copying/overwriting these should work.
>>>>>>>> debug/service/impalad
>>>>>>>> debug/service/
>>>>>>>> debug/service/libService.a
>>>>>>>> release/service/impalad
>>>>>>>> release/service/
>>>>>>>> release/service/libService.a
>>>>>>>> yarn-extras-0.1-SNAPSHOT.jar
>>>>>>>> impala-data-source-api-1.0-SNAPSHOT-sources.jar
>>>>>>>> impala-data-source-api-1.0-SNAPSHOT.jar
>>>>>>>> impala-frontend-0.1-SNAPSHOT-tests.jar
>>>>>>>> impala-frontend-0.1-SNAPSHOT.jar
>>>>>>>> impala-no-sse.bc
>>>>>>>> impala-sse.bc
>>>>>>>> If you are only modifying the Java portion (like
>>>>>>>> DistributedPlanner), then only copying/replacing the *.jar
files should be
>>>>>>>> sufficient.
>>>>>>>> On Fri, Apr 13, 2018 at 11:00 AM, Philipp Krause <
>>>>>>>>> wrote:
>>>>>>>>> Yes, I have a running (virtual) cluster. I would try
to follow
>>>>>>>>> your way with the custom impala build (
is the only
>>>>>>>>> modified file at the moment). Thank you in advance for
the file list!
>>>>>>>>> Best regards
>>>>>>>>> Philipp
>>>>>>>>> Alexander Behm <> schrieb
am Fr., 13. Apr.
>>>>>>>>> 2018, 18:45:
>>>>>>>>>> I'm not really following your installation/setup
and am not an
>>>>>>>>>> expert on Cloudera Manager installation/config. If
you are going to build
>>>>>>>>>> Impala anyway, it's probably easiest to test on Impala's
minicluster first.
>>>>>>>>>> In general, if you have a running Cloudera Managed
cluster, you
>>>>>>>>>> can deploy a custom Impala build by simply overwriting
the Impala existing
>>>>>>>>>> binaries and jars with the new build. If you want
to go this route, I can
>>>>>>>>>> give you a full list of files you need to replace.
>>>>>>>>>> On Tue, Apr 10, 2018 at 11:44 AM, Philipp Krause
>>>>>>>>>>> wrote:
>>>>>>>>>>> Thank you for the explanation! Yes, I'm using
HDFS. The single
>>>>>>>>>>> replica setup is only for test purposes at the
moment. I think this makes
>>>>>>>>>>> it easier to gain some first results since less
modifications (scheduler
>>>>>>>>>>> etc.) are neccessary.
>>>>>>>>>>> I would like to test the DistributedPlanner modification
in my
>>>>>>>>>>> virtual cluster. I used a customized Vagrant
script to install Impala on
>>>>>>>>>>> multiple hosts (s.attachment). It simply installs
>>>>>>>>>>> cloudera-manager-server-db, cloudera-manager-server
>>>>>>>>>>> cloudera-manager-daemons via apt-get. What would
be the simplest solution
>>>>>>>>>>> to setup my modified version? Could I simply
call ./ and change
>>>>>>>>>>> the script to sth. like this?
>>>>>>>>>>> echo "Install java..."
>>>>>>>>>>> apt-get -q -y --force-yes install oracle-j2sdk1.7
>>>>>>>>>>> echo "Download impala..."
>>>>>>>>>>> wget https://... where I uploaded my modified
>>>>>>>>>>> echo "Extract impala..."
>>>>>>>>>>> tar -xvzf Impala-cdh5-trunk.tar.gz
>>>>>>>>>>> cd Impala-cdh5-trunk
>>>>>>>>>>> echo "Build impala..."
>>>>>>>>>>> ./
>>>>>>>>>>> echo "Start impala instances..."
>>>>>>>>>>> service cloudera-scm-server-db initdb
>>>>>>>>>>> service cloudera-scm-server-db start
>>>>>>>>>>> service cloudera-scm-server start
>>>>>>>>>>> Or is there another, maybe even easier method,
to test the code?
>>>>>>>>>>> Maybe via / minicluster?
>>>>>>>>>>> Best regards
>>>>>>>>>>> Philipp
>>>>>>>>>>> 2018-04-05 18:39 GMT+02:00 Alexander Behm <
>>>>>>>>>>>> Apologies for the late response. Btw, your
previous post was
>>>>>>>>>>>> clear enough to me, so no worries :)
>>>>>>>>>>>> On Wed, Apr 4, 2018 at 7:46 AM, Philipp Krause
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hello Alex,
>>>>>>>>>>>>> I think my previous post has been too
long and confusing. I
>>>>>>>>>>>>> apologize for that!
>>>>>>>>>>>>> If replicas are completely deactivated,
all scan ranges of a
>>>>>>>>>>>>> block are mapped to the one host, where
the block is located on. This host
>>>>>>>>>>>>> is the "executor"/reader for all the
scan ranges of this block. Is that
>>>>>>>>>>>>> correct?
>>>>>>>>>>>> Yes, assuming you are using HDFS.
>>>>>>>>>>>>> I tried to visualize my understanding
of the scan_range to
>>>>>>>>>>>>> host mapping for my use case (s. attachment).
Could you please have a quick
>>>>>>>>>>>>> look at it and tell me if this is correct?
>>>>>>>>>>>>> "The existing scan range assignment is
scan-node centric. For
>>>>>>>>>>>>> each scan node, we independently decide
which of its scan ranges should be
>>>>>>>>>>>>> processed by which host."
>>>>>>>>>>>>> Without replicas, all scan ranges of
a block would be assigend
>>>>>>>>>>>>> to the same host where this block is
located on. Isn't everything local
>>>>>>>>>>>>> here, so that Table_A - Block_0 and Table_B
- Block_0 can be joined local
>>>>>>>>>>>>> or are further steps neccessary? The
condition in the DistributedPlanner
>>>>>>>>>>>>> you pointed to me is set to false (no
exchange nodes).
>>>>>>>>>>>>> "You want it to be host-centric. For
each host, collect the
>>>>>>>>>>>>> local scan ranges of *all* scan nodes,
and assign them to that host."
>>>>>>>>>>>>> Wouldn't the standard setup from above
work? Wouldn't I assign
>>>>>>>>>>>>> all (the same) scan ranges to each host
in this case here?
>>>>>>>>>>>> The standard setup works only in if every
block only has
>>>>>>>>>>>> exactly one replica. For our purposes, that
is basically never the case
>>>>>>>>>>>> (who would store production data without
replication?), so the
>>>>>>>>>>>> single-replica assumption was not clear to
>>>>>>>>>>>> Does your current setup (only changing the
planner and not the
>>>>>>>>>>>> scheduler) produce the expected results?
>>>>>>>>>>>>> Thank you very much!
>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>> Philipp
>>>>>>>>>>>>> 2018-03-28 21:04 GMT+02:00 Philipp Krause
>>>>>>>>>>>>>> Thank you for your answer and sorry
for my delay!
>>>>>>>>>>>>>> If my understanding is correct, the
list of scan nodes
>>>>>>>>>>>>>> consists of all nodes which contain
a *local* block from a table that is
>>>>>>>>>>>>>> needed for the query (Assumption:
I have no replicas in my first tests). If
>>>>>>>>>>>>>> TableA-Block0 is on Node_0, isn't
Node_0 automatically a scan node? And
>>>>>>>>>>>>>> wouldn't this scan node always be
the host for the complete scan range(s)
>>>>>>>>>>>>>> then?
>>>>>>>>>>>>>> "For each scan node, we independently
decide which of its
>>>>>>>>>>>>>> scan ranges should be processed by
which host."
>>>>>>>>>>>>>> heduling/
>>>>>>>>>>>>>> // Loop over all scan ranges, select
an executor for those
>>>>>>>>>>>>>> with local impalads and
>>>>>>>>>>>>>> // collect all others for later processing.
>>>>>>>>>>>>>> So in this whole block, scan ranges
are assigned to the
>>>>>>>>>>>>>> closest executor (=host?). But isn't
the closest executor always the node
>>>>>>>>>>>>>> the block is located on (assumed
impalad is installed and I have no
>>>>>>>>>>>>>> replicas)? And isn't this node always
a scan node at the same time?
>>>>>>>>>>>>>> Otherwise a thread on a remote host
had to read the corresponding scan
>>>>>>>>>>>>>> range, which would be more expensive.
The only exception I can think of is
>>>>>>>>>>>>>> when all threads on the local node
are busy. Or, if I use replicas and all
>>>>>>>>>>>>>> other threads of my node with the
"original" block are busy, a thread on
>>>>>>>>>>>>>> another node which contains a replica
could read a special scan range of
>>>>>>>>>>>>>> its local block. Is my understanding
correct here?
>>>>>>>>>>>>>> Aren't all scan ranges read locally
by its scan nodes if I
>>>>>>>>>>>>>> have impalad installed on all nodes?
And am I right, that the scan range is
>>>>>>>>>>>>>> only based on its length which refers
to maxScanRangeLength in
>>>>>>>>>>>>>> computeScanRangeLocations?
>>>>>>>>>>>>>> in/java/org/apache/impala/planner/
>>>>>>>>>>>>>> I hope you can help me with the scan
node <-> scan
>>>>>>>>>>>>>> range->host relationship. If I
have Table_A-Block_0 and Table_B_Block_0 on
>>>>>>>>>>>>>> the same node (which I want to join
locally), I don't get the point of why
>>>>>>>>>>>>>> scan ranges could be assigned to
another host in my scenario.
>>>>>>>>>>>>>> Best regads and thank you very much!
>>>>>>>>>>>>>> Philipp Krause
>>>>>>>>>>>>>> Am 21.03.2018 um 05:21 schrieb Alexander

View raw message