pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "liyunzhang_intel (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (PIG-4771) Implement FR Join for spark engine
Date Tue, 23 Feb 2016 03:50:18 GMT

     [ https://issues.apache.org/jira/browse/PIG-4771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

liyunzhang_intel updated PIG-4771:
----------------------------------
    Attachment: PIG-4771.patch

Use the algorithms in POFRJoin to implement FRJoin for pig on spark mode.
let use an example to explain this feature
frJoin.pig
{code}
A = load './SkewedJoinInput1.txt' as (id,name,n);
B = load './SkewedJoinInput2.txt' as (id,name);
C = filter B by id > 100;
D = join A by (id,name), C by (id,name) using 'replicated';
store D into './testFRJoin.out';
{code}
Physical Plan
{code}
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
D: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-17
|
|---D: FRJoin[tuple] - scope-11
    |   |
    |   Project[bytearray][0] - scope-7
    |   |
    |   Project[bytearray][1] - scope-8
    |   |
    |   Project[bytearray][0] - scope-9
    |   |
    |   Project[bytearray][1] - scope-10
    |
    |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-0
    |
    |---C: Filter[bag] - scope-2
        |   |
        |   Greater Than[boolean] - scope-6
        |   |
        |   |---Cast[int] - scope-4
        |   |   |
        |   |   |---Project[bytearray][0] - scope-3
        |   |
        |   |---Constant(100) - scope-5
        |
        |---B: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope-1
{code}

Spark plan
{code}
Spark node scope-40
Store(hdfs://zly1.sh.intel.com:8020/tmp/temp-862810646/tmp-1458847763:org.apache.pig.impl.io.InterStorage)
- scope-41
|
|---C: Filter[bag] - scope-23
    |   |
    |   Greater Than[boolean] - scope-27
    |   |
    |   |---Cast[int] - scope-25
    |   |   |
    |   |   |---Project[bytearray][0] - scope-24
    |   |
    |   |---Constant(100) - scope-26
    |
    |---B: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput2.txt:org.apache.pig.builtin.PigStorage)
- scope-22--------

Spark node scope-39
D: Store(hdfs://zly1.sh.intel.com:8020/user/root/testFRJoin.out:org.apache.pig.builtin.PigStorage)
- scope-38
|
|---D: FRJoin[tuple] - scope-32
    |   |
    |   Project[bytearray][0] - scope-28
    |   |
    |   Project[bytearray][1] - scope-29
    |   |
    |   Project[bytearray][0] - scope-30
    |   |
    |   Project[bytearray][1] - scope-31
    |
    |---A: Load(hdfs://zly1.sh.intel.com:8020/user/root/SkewedJoinInput1.txt:org.apache.pig.builtin.PigStorage)
- scope-21--------
{code}

in SparkCompiler#visitFRJoin:We create a sparkOperator to save the result of replicated file
to the hdfs temporary file. We load the temporary file in POFRJoin#setUpHashMap.*Why create
a new SparkOperator just load file then store it to a temporary file and then load it in POFRJoin#setUpHash?why
not just load the file in POFRJOin#setUpHash?* This is because we can not gurantee that the
type of predecessors of FRJoin is POLoad in physical plan, in above case, the predecessors
of FRJoin is POFIlter and POLoad.

*How to gurantee that replicated files are access to the spark workers?*
Replicated files are stored in hdfs and spark workers can access them. We set mapred.submit.replication
as "10" to make more backups of replicated files so that  spark workers are likely to access
the data locally. We don't use Distributed Cache( a map-reduce feature) like what is used
in MR mode because we do not gurantee users install MR when they use pig on spark.


> Implement FR Join for spark engine
> ----------------------------------
>
>                 Key: PIG-4771
>                 URL: https://issues.apache.org/jira/browse/PIG-4771
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-4771.patch
>
>
> We use regular join to replace FR join in current code base(fd31fda). We need to implement
FR join.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message