pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelly zhang <liyun.zh...@intel.com>
Subject Re: Review Request 45667: Support Pig On Spark
Date Tue, 14 Jun 2016 03:25:20 GMT


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/test/TestBuiltin.java, line 3255
> > <https://reviews.apache.org/r/45667/diff/1/?file=1323870#file1323870line3255>
> >
> >     This testcase is broken if you have 0-0 repeating twice. It is not UniqueID
anymore.

0-0 repeating twice is because we use TaskID in UniqueID#exec:
public String exec(Tuple input) throws IOException {
    String taskIndex = PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);
    String sequenceId = taskIndex + "-" + Long.toString(sequence);
    sequence++;
    return sequenceId;
}
in MR, we initialize PigContants.TASK_INDEX in  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup

protected void setup(Context context) throws IOException, InterruptedException {
   ...
    context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
...
}

But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX
when job starts.
Suggest to file a new jira(Initialize PigContants.TASK_INDEX when spark job starts) and skip
this unit test until this jira is resolved.


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/data/SelfSpillBag.java, line 32
> > <https://reviews.apache.org/r/45667/diff/1/?file=1323848#file1323848line32>
> >
> >     Why is bag even being serialized by Spark?

SelfSpillBag is used in TestHBaseStorage, if not mark it transient, NotSerializableExecption
is thrown out


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java,
line 66
> > <https://reviews.apache.org/r/45667/diff/1/?file=1323865#file1323865line66>
> >
> >     Why does A[3,4] repeat?

The pig script is like:

            LOAD '" + Util.encodeEscape(input.getAbsolutePath()) + "' using PigStorage();\n"
               "B = GROUP A BY $0;\n"
                "A = FOREACH B GENERATE COUNT(A);\n"
               "STORE A INTO '" + Util.encodeEscape(output.getAbsolutePath()) + "';");

The spark plan is :
A: Store(/tmp/pig_junit_tmp1755582848/test6087259092054964214output:org.apache.pig.builtin.PigStorage)
- scope-9
|
|---A: New For Each(false)[tuple] - scope-13
    |   |
    |   Project[bag][1] - scope-11
    |   
    |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-12
    |   |
    |   |---Project[bag][1] - scope-28
    |
    |---Reduce By(false,false)[tuple] - scope-18
        |   |
        |   Project[bytearray][0] - scope-19
        |   |
        |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-20
        |   |
        |   |---Project[bag][1] - scope-21
        |
        |---B: Local Rearrange[tuple]{bytearray}(false) - scope-24
            |   |
            |   Project[bytearray][0] - scope-26
            |
            |---A: New For Each(false,false)[bag] - scope-14
                |   |
                |   Project[bytearray][0] - scope-15
                |   |
                |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-16
                |   |
                |   |---Project[bag][1] - scope-17
                |
                |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-27
                    |
                    |---A: Load(/tmp/pig_junit_tmp1755582848/test7108242581632795697input:PigStorage)
- scope-0--------

 There are two ForEach (scope-13 and scope-14) in the sparkplan so A[3,4] appears twice.

Comparing with MR plan:
#--------------------------------------------------
# Map Reduce Plan                                 
#--------------------------------------------------
MapReduce node scope-10
Map Plan
B: Local Rearrange[tuple]{bytearray}(false) - scope-22
|   |
|   Project[bytearray][0] - scope-24
|
|---A: New For Each(false,false)[bag] - scope-11
    |   |
    |   Project[bytearray][0] - scope-12
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Initial)[tuple] - scope-13
    |   |
    |   |---Project[bag][1] - scope-14
    |
    |---Pre Combiner Local Rearrange[tuple]{Unknown} - scope-25
        |
        |---A: Load(/tmp/pig_junit_tmp910232853/test2548400580131197161input:PigStorage) -
scope-0--------
Combine Plan
B: Local Rearrange[tuple]{bytearray}(false) - scope-26
|   |
|   Project[bytearray][0] - scope-28
|
|---A: New For Each(false,false)[bag] - scope-15
    |   |
    |   Project[bytearray][0] - scope-16
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Intermediate)[tuple] - scope-17
    |   |
    |   |---Project[bag][1] - scope-18
    |
    |---B: Package(CombinerPackager)[tuple]{bytearray} - scope-21--------
Reduce Plan
A: Store(/tmp/pig_junit_tmp910232853/test9096852332434708302output:org.apache.pig.builtin.PigStorage)
- scope-9
|
|---A: New For Each(false)[bag] - scope-8
    |   |
    |   POUserFunc(org.apache.pig.builtin.COUNT$Final)[long] - scope-6
    |   |
    |   |---Project[bag][1] - scope-19
    |
    |---B: Package(CombinerPackager)[tuple]{bytearray} - scope-2--------
Global sort: false
----------------

There are two ForEach(scope-8 and scope-11) in MapPlan and Reduce Plan, so A[3,4] appears
twice in result(M: A[1,4],A[3,4],B[2,4] C: A[3,4],B[2,4] R: A[3,4] )


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > test/org/apache/pig/test/TestMultiQuery.java, line 116
> > <https://reviews.apache.org/r/45667/diff/1/?file=1323886#file1323886line116>
> >
> >     Why are we using checkQueryOutputsAfterSortRecursive in many places when checkQueryOutputsAfterSort
would do? It will unnecessarily increase the test execution time. Can they all be changed?
Or am I missing something and checkQueryOutputsAfterSort cannot be used for some reason?

The difference between Util.checkQueryOutputsAfterSortRecursive and Util.checkQueryOutputsAfterSort:
we can send schema:LogicalSchema to Util.checkQueryOutputsAfterSortRecursive, so function
will help change expectedResArray:String[] to expectedRes:ArrayList<Tuple> with proper
schema(int,string or other type) 

           static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
            String[] expectedResArray, LogicalSchema schema) throws IOException {
            ...
            }


> On May 22, 2016, 9:57 p.m., Rohini Palaniswamy wrote:
> > ivy.xml, line 451
> > <https://reviews.apache.org/r/45667/diff/1/?file=1323775#file1323775line451>
> >
> >     What does 2.10 in spark-core_2.10 and spark-yarn_2.10 signify?

2.10 means the version of scala,  this is hard-code when we write the ivy dependency(http://mvnrepository.com/artifact/org.apache.spark).


- kelly


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45667/#review134255
-----------------------------------------------------------


On April 4, 2016, 5:19 a.m., Pallavi Rao wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45667/
> -----------------------------------------------------------
> 
> (Updated April 4, 2016, 5:19 a.m.)
> 
> 
> Review request for pig, Daniel Dai and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-4059 and PIG-4854
>     https://issues.apache.org/jira/browse/PIG-4059
>     https://issues.apache.org/jira/browse/PIG-4854
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> The patch contains all the work done in the spark branch, so far.
> 
> 
> Diffs
> -----
> 
>   bin/pig 81f1426 
>   build.xml 8db1a80 
>   ivy.xml dd9878e 
>   ivy/libraries.properties 55d9aed 
>   shims/test/hadoop20/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
>   shims/test/hadoop23/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
>   src/META-INF/services/org.apache.pig.ExecType 5c034c8 
>   src/docs/src/documentation/content/xdocs/start.xml 36f9952 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
1ff1abd 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
ecf780c 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
2376d03 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
bcbfe2b 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
894cda7 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
21b75f1 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
52cfb73 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
6adfa91 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
c3a82c3 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
c4b44ad 
>   src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java 889c01b

>   src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
0b59c9c 
>   src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java e0581d9 
>   src/org/apache/pig/data/SelfSpillBag.java d17f0a8 
>   src/org/apache/pig/impl/PigContext.java d43949f 
>   src/org/apache/pig/impl/plan/OperatorPlan.java 8b2e2e7 
>   src/org/apache/pig/tools/pigstats/PigStatsUtil.java 542cc2e 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounters.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java PRE-CREATION 
>   test/e2e/pig/build.xml f7c38ba 
>   test/e2e/pig/conf/spark.conf PRE-CREATION 
>   test/e2e/pig/drivers/TestDriverPig.pm bf9c302 
>   test/e2e/pig/tests/streaming.conf 18f2fb2 
>   test/excluded-tests-spark PRE-CREATION 
>   test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java 94b34b3

>   test/org/apache/pig/spark/TestIndexedKey.java PRE-CREATION 
>   test/org/apache/pig/spark/TestSecondarySortSpark.java PRE-CREATION 
>   test/org/apache/pig/test/MiniGenericCluster.java 9347269 
>   test/org/apache/pig/test/TestAssert.java 6d4b5c6 
>   test/org/apache/pig/test/TestBuiltin.java 44b4d09 
>   test/org/apache/pig/test/TestCase.java c9bb2fa 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d33 
>   test/org/apache/pig/test/TestCombiner.java df44293 
>   test/org/apache/pig/test/TestCubeOperator.java de96e6c 
>   test/org/apache/pig/test/TestEvalPipeline.java 9efde13 
>   test/org/apache/pig/test/TestEvalPipeline2.java c8f51d7 
>   test/org/apache/pig/test/TestEvalPipelineLocal.java c12d595 
>   test/org/apache/pig/test/TestFinish.java f18c103 
>   test/org/apache/pig/test/TestForEachNestedPlanLocal.java b0aa3a8 
>   test/org/apache/pig/test/TestGrunt.java ef121a3 
>   test/org/apache/pig/test/TestHBaseStorage.java 8d2ad85 
>   test/org/apache/pig/test/TestLimitVariable.java 53b9dae 
>   test/org/apache/pig/test/TestMapSideCogroup.java 2c78b4a 
>   test/org/apache/pig/test/TestMergeJoin.java f1a9608 
>   test/org/apache/pig/test/TestMergeJoinOuter.java 81aee55 
>   test/org/apache/pig/test/TestMultiQuery.java 40684b4 
>   test/org/apache/pig/test/TestMultiQueryLocal.java b9ac035 
>   test/org/apache/pig/test/TestNativeMapReduce.java c4f6573 
>   test/org/apache/pig/test/TestNullConstant.java 3ea4509 
>   test/org/apache/pig/test/TestPigRunner.java fde8609 
>   test/org/apache/pig/test/TestPigServerLocal.java fbabd03 
>   test/org/apache/pig/test/TestProjectRange.java 2e3e7b8 
>   test/org/apache/pig/test/TestPruneColumn.java 3936332 
>   test/org/apache/pig/test/TestRank1.java 9e4ef62 
>   test/org/apache/pig/test/TestRank2.java fc802a9 
>   test/org/apache/pig/test/TestRank3.java 43af10d 
>   test/org/apache/pig/test/TestSecondarySort.java 8991010 
>   test/org/apache/pig/test/TestSkewedJoin.java dba2241 
>   test/org/apache/pig/test/TestStoreBase.java eb3b253 
>   test/org/apache/pig/test/Util.java 8dae247 
>   test/spark-tests PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/45667/diff/
> 
> 
> Testing
> -------
> 
> New UTs were added where required and ensure old UTs pass -> https://builds.apache.org/job/Pig-spark/
> 
> 
> Thanks,
> 
> Pallavi Rao
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message