pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (PIG-4871) Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark
Date Wed, 20 Apr 2016 02:46:25 GMT

     [ https://issues.apache.org/jira/browse/PIG-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

liyunzhang_intel updated PIG-4871:
----------------------------------
    Attachment: PIG-4871_2.patch

[~kexianda],[~mohitsabharwal],[~pallavi.rao], [~xuefuz]:

PIG-4871_2.patch 's changes:
1. remove OperatorPlan#forceConnect
2. clone the physical plan of spliter and then merge the clone plan with splittee's plan like
what mr does.

All unit tests pass after using this patch. Patch of PIG-4797 depends on this.

>  Not use OperatorPlan#forceConnect in MultiQueryOptimizationSpark
> -----------------------------------------------------------------
>
>                 Key: PIG-4871
>                 URL: https://issues.apache.org/jira/browse/PIG-4871
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4871_2.patch
>
>
> In current code base, we use OperatorPlan#forceConnect() while merge the physical plan
of spliter and splittee in MultiQueryOptimizationSpark.
> The difference between OperatorPlan#connect and OperatorPlan#forceConnect is not checking
whether support multiOutputs and multiInputs or not in forceConnect.
> {code}
>  /**
>      * connect from and to and ignore some judgements: like ignoring judge whether from
operator supports multiOutputs
>      * and whether to operator supports multiInputs
>      *
>      * @param from Operator data will flow from.
>      * @param to   Operator data will flow to.
>      * @throws PlanException if connect from or to which is not in the plan
>      */
>     public void forceConnect(E from, E to) throws PlanException {
>         markDirty();
>         // Check that both nodes are in the plan.
>         checkInPlan(from);
>         checkInPlan(to);
>         mFromEdges.put(from, to);
>         mToEdges.put(to, from);
>     }
> {code}
> Let's use an example to explain why add forceConnect before.
> {code}
> A = load './split5'  AS (a0:int, a1:int, a2:int);
> B = foreach A generate a0, a1;
> C = join A by a0, B by a0;
> D = filter C by A::a1>=B::a1;
> store D into './split5.out';
> {code}
> before multiquery optimization
> {code}
> scope-37->scope-43 
> scope-43
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-37
> Store(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
- scope-38
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
- scope-0--------
> Spark node scope-43
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: New For Each(true,true)[tuple] - scope-31
>         |   |
>         |   Project[bag][1] - scope-29
>         |   |
>         |   Project[bag][2] - scope-30
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-24
>             |
>             |---C: Global Rearrange[tuple] - scope-23
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-25
>                 |   |   |
>                 |   |   Project[int][0] - scope-26
>                 |   |
>                 |   |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
- scope-39
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-27
>                     |   |
>                     |   Project[int][0] - scope-28
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-20
>                         |   |
>                         |   Project[int][0] - scope-16
>                         |   |
>                         |   Project[int][1] - scope-18
>                         |
>                         |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp-535495592/tmp-2029463812:org.apache.pig.impl.io.InterStorage)
- scope-41--------{code}
> after multiquery optimization
> {code}
> after multiquery optimization:
> scope-37
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-37
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: New For Each(true,true)[tuple] - scope-31
>         |   |
>         |   Project[bag][1] - scope-29
>         |   |
>         |   Project[bag][2] - scope-30
>         |
>         |---C: Package(Packager)[tuple]{int} - scope-24
>             |
>             |---C: Global Rearrange[tuple] - scope-23
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-25
>                 |   |   |
>                 |   |   Project[int][0] - scope-26
>                 |   |
>                 |   |---A: New For Each(false,false,false)[bag] - scope-10
>                 |       |   |
>                 |       |   Cast[int] - scope-2
>                 |       |   |
>                 |       |   |---Project[bytearray][0] - scope-1
>                 |       |   |
>                 |       |   Cast[int] - scope-5
>                 |       |   |
>                 |       |   |---Project[bytearray][1] - scope-4
>                 |       |   |
>                 |       |   Cast[int] - scope-8
>                 |       |   |
>                 |       |   |---Project[bytearray][2] - scope-7
>                 |       |
>                 |       |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
- scope-0
>                 |
>                 |---C: Local Rearrange[tuple]{int}(false) - scope-27
>                     |   |
>                     |   Project[int][0] - scope-28
>                     |
>                     |---B: New For Each(false,false)[bag] - scope-20
>                         |   |
>                         |   Project[int][0] - scope-16
>                         |   |
>                         |   Project[int][1] - scope-18
>                         |
>                         |---A: New For Each(false,false,false)[bag] - scope-10
>                             |   |
>                             |   Cast[int] - scope-2
>                             |   |
>                             |   |---Project[bytearray][0] - scope-1
>                             |   |
>                             |   Cast[int] - scope-5
>                             |   |
>                             |   |---Project[bytearray][1] - scope-4
>                             |   |
>                             |   Cast[int] - scope-8
>                             |   |
>                             |   |---Project[bytearray][2] - scope-7
>                             |
>                             |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
- scope-0--------
> {code}
> We connect ForEach(scope-10) in SparkNode(scope-37) with ForEach(scope-20) and LocalRearrange(scope-25)
in SparkNode(scope-43). The successors of ForEach(scope-10) are scope-20 and scope-25 after
multiquery optimization. Here we need use OperatorPlan#forceConnect(from, to) because POForEach#supportsMultipleOutputs
are false. *Why there is no problem in mr mode?* in mr, clone ForEach(scope-10) as ForEach(scope-xxx),
so the size of successors of POForEach is always 1.
> before multiquery optimization in mr mode:
> {code}
> #--------------------------------------------------
> # Map Reduce Plan                                  
> #--------------------------------------------------
> MapReduce node scope-37
> Map Plan
> Store(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage)
- scope-38
> |
> |---A: New For Each(false,false,false)[bag] - scope-10
>     |   |
>     |   Cast[int] - scope-2
>     |   |
>     |   |---Project[bytearray][0] - scope-1
>     |   |
>     |   Cast[int] - scope-5
>     |   |
>     |   |---Project[bytearray][1] - scope-4
>     |   |
>     |   Cast[int] - scope-8
>     |   |
>     |   |---Project[bytearray][2] - scope-7
>     |
>     |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
- scope-0--------
> Global sort: false
> ----------------
> MapReduce node scope-43
> Map Plan
> Union[tuple] - scope-44
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-25
> |   |   |
> |   |   Project[int][0] - scope-26
> |   |
> |   |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage)
- scope-39
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-27
>     |   |
>     |   Project[int][0] - scope-28
>     |
>     |---B: New For Each(false,false)[bag] - scope-20
>         |   |
>         |   Project[int][0] - scope-16
>         |   |
>         |   Project[int][1] - scope-18
>         |
>         |---Load(hdfs://zly2.sh.intel.com:8020/tmp/temp825700611/tmp-47636243:org.apache.pig.impl.io.InterStorage)
- scope-41--------
> Reduce Plan
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
> Global sort: false
> ----------------
> {code}
> after multiquery optimization in mr mode, scope-53 and scope-20 is the  clone of  scope-10

> {code}
> #--------------------------------------------------
> # Map Reduce Plan                                  
> #--------------------------------------------------
> MapReduce node scope-43
> Map Plan
> Union[tuple] - scope-44
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-25
> |   |   |
> |   |   Project[int][0] - scope-26
> |   |
> |   |---A: New For Each(false,false,false)[bag] - scope-53
> |       |   |
> |       |   Cast[int] - scope-48
> |       |   |
> |       |   |---Project[bytearray][0] - scope-47
> |       |   |
> |       |   Cast[int] - scope-50
> |       |   |
> |       |   |---Project[bytearray][1] - scope-49
> |       |   |
> |       |   Cast[int] - scope-52
> |       |   |
> |       |   |---Project[bytearray][2] - scope-51
> |       |
> |       |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
- scope-46
> |
> |---C: Local Rearrange[tuple]{int}(false) - scope-27
>     |   |
>     |   Project[int][0] - scope-28
>     |
>     |---B: New For Each(false,false)[bag] - scope-20
>         |   |
>         |   Project[int][0] - scope-16
>         |   |
>         |   Project[int][1] - scope-18
>         |
>         |---A: New For Each(false,false,false)[bag] - scope-61
>             |   |
>             |   Cast[int] - scope-56
>             |   |
>             |   |---Project[bytearray][0] - scope-55
>             |   |
>             |   Cast[int] - scope-58
>             |   |
>             |   |---Project[bytearray][1] - scope-57
>             |   |
>             |   Cast[int] - scope-60
>             |   |
>             |   |---Project[bytearray][2] - scope-59
>             |
>             |---A: Load(hdfs://zly2.sh.intel.com:8020/user/root/split5:org.apache.pig.builtin.PigStorage)
- scope-54--------
> Reduce Plan
> D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36
> |
> |---D: Filter[bag] - scope-32
>     |   |
>     |   Greater Than or Equal[boolean] - scope-35
>     |   |
>     |   |---Project[int][1] - scope-33
>     |   |
>     |   |---Project[int][4] - scope-34
>     |
>     |---C: Package(JoinPackager(true,true))[tuple]{int} - scope-24--------
> Global sort: false
> ----------------
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message