pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelly zhang <liyun.zh...@intel.com>
Subject Re: Review Request 57317: Support Pig On Spark
Date Sat, 25 Mar 2017 18:44:39 GMT


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
> > Lines 97 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666865#file1666865line97>
> >
> >     Why capture metrics at task level? Pig client might run out of memory when there
are lot of tasks

we need get taskMetrics by SparkListenerTaskEnd#taskMetrics in JobMetricsListener#onTaskEnd.
And we can update latest metrics once the task ends. Is it more suitable that we update the
taskMetrics when the stage is completed? if yes, i will fire jira and will update once we
upgrade to spark2.0(currently we can not move the code in onTaskEnd to onStageCompleted because
api does not provide info in spark1.6)


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
> > Lines 179 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666865#file1666865line179>
> >
> >     What is the point of wait() if you are just returning false? Shouldn't wait
be done in a loop till finishedJobIds.contains(jobId) ?
> >     
> >     jobMetricsListener.waitForJobToEnd(jobID); which calls this method does not
even check for the return type. Should it be changed to void?

I saw following in the commment which explains why we use "wait". But i will fire jira to
verify whether this problem exists in latest spark version.
// Even though we are not making any async calls to spark,
        // the SparkStatusTracker can still return RUNNING status
        // for a finished job.
        // Looks like there is a race condition between spark
        // "event bus" thread updating it's internal listener and
        // this driver thread calling SparkStatusTracker.
        // To workaround this, we will wait for this job to "finish".
        jobMetricsListener.waitForJobToEnd(jobID);


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
> > Lines 79 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666867#file1666867line79>
> >
> >     Any reason for using a IndexedKey to store PigNullableWritable instead of directly
using PigNullableWritable? It is to have Serializable implementation?
> >     
> >     The comparators for PigNullableWritable have lot of conditions for the different
data types taken care of and IndexedKey can miss some of that. Also since you are repeating
the index and have another object wrapper, the size of each record will be more.

Indexed Key is imported since PIG-4284. This jira fixed problem like "
(,3) from table a and (,evening) from table b are considered to have same key(NULL)." . The
function of IndexedKey and PigNullableWritable is similar. IndexedKey is used by many classes
and maybe a big change if need to replace. Filed PIG-5197 to track this.


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
> > Lines 102-103 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666867#file1666867line102>
> >
> >     Reflection for method is not required. You can just call getPartition() directly
on the Partitioner

MapReducePartitionerWrapper implements org.apache.spark.Partitioner. Use reflection to call
org.apache.hadoop.mapreduce.Partitioner. If my understanding is wrong, tell me and will contact
the contributor of the code.


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
> > Lines 387-388 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666871#file1666871line387>
> >
> >     Why copy to local directory if already in hdfs?

Copy cache file which is stored in hdfs to local and upload the cache file to spark cluster
by sparkContext#addFiles in order to let these files can be downloaded by spark workers.


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
> > Lines 576-580 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666871#file1666871line576>
> >
> >     This is redundant. Setting true again for the same setting.

the code is to modify the spark.shuffle.service.enable to true although users set the value
as “false”, if my understanding is wrong, tell me and will contact the contributor of
the code.


- kelly


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/57317/#review169547
-----------------------------------------------------------


On March 17, 2017, 6:35 a.m., kelly zhang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/57317/
> -----------------------------------------------------------
> 
> (Updated March 17, 2017, 6:35 a.m.)
> 
> 
> Review request for pig, Daniel Dai and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-4059 and PIG-4854;
>     https://issues.apache.org/jira/browse/PIG-4059
>     https://issues.apache.org/jira/browse/PIG-4854;
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Merge all changes from spark branch
> 
> 
> Diffs
> -----
> 
>   bin/pig e1212fa 
>   build.xml a0d2ca8 
>   ivy.xml 42daec9 
>   ivy/libraries.properties 481066e 
>   src/META-INF/services/org.apache.pig.ExecType 5c034c8 
>   src/docs/src/documentation/content/xdocs/start.xml c9a1491 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java e866b28

>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
0e35273 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
ecf780c 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
3bad98b 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
2376d03 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
bcbfe2b 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
d80951a 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
4dc6d54 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
52cfb73 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
4923d3f 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
13f70c0 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
f2830c2 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
c3a82c3 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
c4b44ad 
>   src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java 889c01b

>   src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
0b59c9c 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 951146f 
>   src/org/apache/pig/data/SelfSpillBag.java d17f0a8 
>   src/org/apache/pig/impl/plan/OperatorPlan.java 8b2e2e7 
>   src/org/apache/pig/impl/util/UDFContext.java 09afc0a 
>   src/org/apache/pig/tools/pigstats/PigStatsUtil.java e97625f 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounters.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java PRE-CREATION 
>   test/e2e/pig/build.xml 1ec9cf6 
>   test/e2e/pig/conf/spark.conf PRE-CREATION 
>   test/e2e/pig/drivers/TestDriverPig.pm bcec317 
>   test/e2e/pig/tests/streaming.conf 18f2fb2 
>   test/excluded-tests-spark PRE-CREATION 
>   test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java 94b34b3

>   test/org/apache/pig/spark/TestIndexedKey.java PRE-CREATION 
>   test/org/apache/pig/spark/TestSecondarySortSpark.java PRE-CREATION 
>   test/org/apache/pig/test/MiniGenericCluster.java 9347269 
>   test/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
>   test/org/apache/pig/test/TestAssert.java 6d4b5c6 
>   test/org/apache/pig/test/TestCase.java c9bb2fa 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d33 
>   test/org/apache/pig/test/TestCombiner.java df44293 
>   test/org/apache/pig/test/TestCubeOperator.java de96e6c 
>   test/org/apache/pig/test/TestEmptyInputDir.java a9a46af 
>   test/org/apache/pig/test/TestEvalPipeline.java 48ece69 
>   test/org/apache/pig/test/TestEvalPipeline2.java c8f51d7 
>   test/org/apache/pig/test/TestEvalPipelineLocal.java c12d595 
>   test/org/apache/pig/test/TestFinish.java f18c103 
>   test/org/apache/pig/test/TestForEachNestedPlanLocal.java 63d8f67 
>   test/org/apache/pig/test/TestGrunt.java f16ff60 
>   test/org/apache/pig/test/TestHBaseStorage.java 864985e 
>   test/org/apache/pig/test/TestLimitVariable.java 53b9dae 
>   test/org/apache/pig/test/TestLineageFindRelVisitor.java e8e6aeb 
>   test/org/apache/pig/test/TestMapSideCogroup.java 2c78b4a 
>   test/org/apache/pig/test/TestMergeJoinOuter.java 81aee55 
>   test/org/apache/pig/test/TestMultiQuery.java c32eab7 
>   test/org/apache/pig/test/TestMultiQueryLocal.java b9ac035 
>   test/org/apache/pig/test/TestNativeMapReduce.java c4f6573 
>   test/org/apache/pig/test/TestNullConstant.java 3ea4509 
>   test/org/apache/pig/test/TestPigRunner.java 25380e4 
>   test/org/apache/pig/test/TestPigServer.java 8e28646 
>   test/org/apache/pig/test/TestPigServerLocal.java fbabd03 
>   test/org/apache/pig/test/TestProjectRange.java 2e3e7b8 
>   test/org/apache/pig/test/TestPruneColumn.java f05e0ec 
>   test/org/apache/pig/test/TestRank1.java 9e4ef62 
>   test/org/apache/pig/test/TestRank2.java fc802a9 
>   test/org/apache/pig/test/TestRank3.java 43af10d 
>   test/org/apache/pig/test/TestSecondarySort.java 8991010 
>   test/org/apache/pig/test/TestSkewedJoin.java 947a31b 
>   test/org/apache/pig/test/TestStoreBase.java eb3b253 
>   test/org/apache/pig/test/TezMiniCluster.java 0bf7c5a 
>   test/org/apache/pig/test/Util.java 18b241e 
>   test/org/apache/pig/test/YarnMiniCluster.java PRE-CREATION 
> 
> 
> Diff: https://reviews.apache.org/r/57317/diff/3/
> 
> 
> Testing
> -------
> 
> all test pass
> 
> 
> Thanks,
> 
> kelly zhang
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message