pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kelly zhang <liyun.zh...@intel.com>
Subject Re: Review Request 57317: Support Pig On Spark
Date Tue, 02 May 2017 22:11:16 GMT


> On March 24, 2017, 4:14 a.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java
> > Lines 49-51 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666868#file1666868line49>
> >
> >     Why do we need these settings with spark prefixed when we have udf.import.list,
UDFContext.UDF_CONTEXT and UDFContext.CLIENT_SYS_PROPS

PIG-4920 traced the reason why we need add these settings.  

Because in spark, the initialization of UDFContext#udfConfs is later than deserialization
of all objects while it is different in mr /tez mode.
Let ‘s use TestHBaseStorage#testLoadWithProjection_1 to explain more:
HBaseStorage#defaultCaster is set in the constructor of HBaseStorage, its value from UDFContext.getUDFContext.getClientSystemProps().
public HBaseStorage(String columnList, String optString) throws ParseException, IOException
{
….
  String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY,
STRING_CASTER);
 
}


In mr mode, we initialize UDFContext(PigGenericMapBase#setup -> MapRedUtil.setupUDFContext
-> UDFContext#deserialize) first then call HBaseStorage#constructor.
In spark mode, Spark will deserialize all object first then starting executor to execute the
program. HBaseStorage#constructor first then initialize UDFContext(PigInputFormatSpark#createRecordReader->MapRedUtil#setupUDFContext->
UDFContext#deserialize), so NPE is thrown out.  The solution is  serialize and deserialize
UDFContext#udfConfs and UDFContext#clientSysProps in SparkEngineConf.

The naming is not suitable so change from "spark.*" to "pig.spark.*


> On March 24, 2017, 4:14 a.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
> > Lines 72 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666916#file1666916line72>
> >
> >     Can you make this method in mapreduce NoopFilterRemover as public static and
call that instead of duplicating here?

mapReduceLayer.NoopFilterRemover is a private class and can not used by NoopFilterRemover
in spark package. so create NoopFilterRemoverUtil and create NoopFilterRemoverUtil#removeFilter


- kelly


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/57317/#review169617
-----------------------------------------------------------


On April 21, 2017, 5:31 a.m., kelly zhang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/57317/
> -----------------------------------------------------------
> 
> (Updated April 21, 2017, 5:31 a.m.)
> 
> 
> Review request for pig, Daniel Dai and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-4059 and PIG-4854;
>     https://issues.apache.org/jira/browse/PIG-4059
>     https://issues.apache.org/jira/browse/PIG-4854;
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Merge all changes from spark branch
> 
> 
> Diffs
> -----
> 
>   bin/pig e1212fa 
>   build.xml a0d2ca8 
>   ivy.xml 42daec9 
>   ivy/libraries.properties 481066e 
>   src/META-INF/services/org.apache.pig.ExecType 5c034c8 
>   src/docs/src/documentation/content/xdocs/start.xml c9a1491 
>   src/org/apache/pig/PigConfiguration.java d25f81a 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
ac03d40 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemover.java
4d91556 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/NoopFilterRemoverUtil.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
6fe8ff3 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java e866b28

>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
8170f02 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
0e35273 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
ecf780c 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
3bad98b 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
2376d03 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
bcbfe2b 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
d80951a 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
4dc6d54 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
52cfb73 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
4923d3f 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
13f70c0 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
f2830c2 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
c3a82c3 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigContext.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkSecondaryKeyOptimizerUtil.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java PRE-CREATION

>   src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
eb5b801 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
caf1786 
>   src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
c4b44ad 
>   src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java 889c01b

>   src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
0b59c9c 
>   src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 1826131 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 951146f 
>   src/org/apache/pig/data/SelfSpillBag.java d17f0a8 
>   src/org/apache/pig/impl/PigContext.java d43949f 
>   src/org/apache/pig/impl/builtin/StreamingUDF.java de98539 
>   src/org/apache/pig/impl/plan/OperatorPlan.java 8b2e2e7 
>   src/org/apache/pig/impl/util/UDFContext.java 09afc0a 
>   src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java cff59db 
>   src/org/apache/pig/tools/pigstats/PigStatsUtil.java e97625f 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounters.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java PRE-CREATION 
>   test/e2e/pig/build.xml 1ec9cf6 
>   test/e2e/pig/conf/spark.conf PRE-CREATION 
>   test/e2e/pig/drivers/TestDriverPig.pm bcec317 
>   test/e2e/pig/tests/bigdata.conf c51ae48 
>   test/e2e/pig/tests/cmdline.conf d72c8c0 
>   test/e2e/pig/tests/grunt.conf 9c794d4 
>   test/e2e/pig/tests/hcat.conf 5910b2b 
>   test/e2e/pig/tests/multiquery.conf 667659a 
>   test/e2e/pig/tests/negative.conf 9a92528 
>   test/e2e/pig/tests/nightly.conf fe5b447 
>   test/e2e/pig/tests/orc.conf 9498d88 
>   test/e2e/pig/tests/streaming.conf 18f2fb2 
>   test/e2e/pig/tests/turing_jython.conf 077231b 
>   test/excluded-tests-spark PRE-CREATION 
>   test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java 94b34b3

>   test/org/apache/pig/pigunit/PigTest.java 0866979 
>   test/org/apache/pig/spark/TestIndexedKey.java PRE-CREATION 
>   test/org/apache/pig/spark/TestSecondarySortSpark.java PRE-CREATION 
>   test/org/apache/pig/spark/TestSparkCompiler.java PRE-CREATION 
>   test/org/apache/pig/test/MiniGenericCluster.java 9347269 
>   test/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
>   test/org/apache/pig/test/TestAssert.java 6d4b5c6 
>   test/org/apache/pig/test/TestCase.java c9bb2fa 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d33 
>   test/org/apache/pig/test/TestCombiner.java df44293 
>   test/org/apache/pig/test/TestCubeOperator.java de96e6c 
>   test/org/apache/pig/test/TestEmptyInputDir.java a9a46af 
>   test/org/apache/pig/test/TestEvalPipeline.java 48ece69 
>   test/org/apache/pig/test/TestEvalPipeline2.java c8f51d7 
>   test/org/apache/pig/test/TestEvalPipelineLocal.java c12d595 
>   test/org/apache/pig/test/TestFinish.java f18c103 
>   test/org/apache/pig/test/TestForEachNestedPlanLocal.java 63d8f67 
>   test/org/apache/pig/test/TestGrunt.java f16ff60 
>   test/org/apache/pig/test/TestHBaseStorage.java 864985e 
>   test/org/apache/pig/test/TestLimitVariable.java 53b9dae 
>   test/org/apache/pig/test/TestLineageFindRelVisitor.java e8e6aeb 
>   test/org/apache/pig/test/TestMapSideCogroup.java 2c78b4a 
>   test/org/apache/pig/test/TestMergeJoinOuter.java 81aee55 
>   test/org/apache/pig/test/TestMultiQuery.java c32eab7 
>   test/org/apache/pig/test/TestMultiQueryLocal.java b9ac035 
>   test/org/apache/pig/test/TestNativeMapReduce.java c4f6573 
>   test/org/apache/pig/test/TestNullConstant.java 3ea4509 
>   test/org/apache/pig/test/TestPigRunner.java 25380e4 
>   test/org/apache/pig/test/TestPigServer.java 8e28646 
>   test/org/apache/pig/test/TestPigServerLocal.java fbabd03 
>   test/org/apache/pig/test/TestProjectRange.java 2e3e7b8 
>   test/org/apache/pig/test/TestPruneColumn.java f05e0ec 
>   test/org/apache/pig/test/TestRank1.java 9e4ef62 
>   test/org/apache/pig/test/TestRank2.java fc802a9 
>   test/org/apache/pig/test/TestRank3.java 43af10d 
>   test/org/apache/pig/test/TestSecondarySort.java 8991010 
>   test/org/apache/pig/test/TestStoreBase.java eb3b253 
>   test/org/apache/pig/test/TezMiniCluster.java 0bf7c5a 
>   test/org/apache/pig/test/Util.java 18b241e 
>   test/org/apache/pig/test/YarnMiniCluster.java PRE-CREATION 
>   test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld PRE-CREATION

>   test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld PRE-CREATION

>   test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld PRE-CREATION

> 
> 
> Diff: https://reviews.apache.org/r/57317/diff/6/
> 
> 
> Testing
> -------
> 
> all test pass
> 
> 
> Thanks,
> 
> kelly zhang
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message