pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Praveen R <prav...@sigmoidanalytics.com>
Subject Re: How to deal with visitLocalRearrange in spark mode
Date Tue, 10 Feb 2015 10:02:02 GMT
Hi Kelly,

Incase of pig-spark, we may not need to split the operators into map,
reduce and combine plan but instead have all related operators on the same
plan since the handling of intermediate data and optimising the data
transformations flow would be taken care by Spark.

Regards,
Praveen

On Thu, Feb 5, 2015 at 2:16 PM, Zhang, Liyun <liyun.zhang@intel.com> wrote:

>  Hi all:
>
> Now i'm working on PIG-4374
> <https://issues.apache.org/jira/browse/PIG-4374>(Add SparkPlan in spark
> package). I met problem in following scripts in spark mode.
>
> Join.pig
>
> A = load '/SkewedJoinInput1.txt' as (id,name,n);
>
> B = load '/SkewedJoinInput2.txt' as (id,name);
>
> C = group A by id;
>
> D = group B by id;
>
> E = join C by group, D by group;
>
> store E into '/skewedjoin.out';
>
> explain E;
>
>
>
> The physical plan will change to a mr plan which contains 3 mapreduce
> nodes (see attached mr_join.txt)
>
> "logroup" will converts to  "poLocalRearrange","poGlobalRearrange",
> "poPackage"
>
> "lojoin" will converts to "poLocalRearrange","poGlobalRearrange",
> "poPackage","poPackage"
>
>
>
>    in mapreduce mode, In MapReduceOper, there is mapplan, reduceplan,
> combineplan.
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.visitLocalRearrange
>
>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.addToMap
>
>    private void addToMap(PhysicalOperator op) throws PlanException,
> IOException{
>
>          if (compiledInputs.length == 1) {
>
>              //For speed
>
>              MapReduceOper mro = compiledInputs[0];
>
>              if (!mro.isMapDone()) {
>
>                  mro.mapPlan.addAsLeaf(op);
>
>              } else if (mro.isMapDone() && !mro.isReduceDone()) {
>
>                  FileSpec fSpec = getTempFileSpec();
>
>
>
>                  POStore st = getStore();     // MyComment: It will first
> add a POStore in mro.reducePlan and store the mro result in a tmp file.
>
>                                                     // Then create a new
> MROper which contains a poload which loads previous tmp file
>
>                  st.setSFile(fSpec);
>
>                  mro.reducePlan.addAsLeaf(st);
>
>                  mro.setReduceDone(true);
>
>                  mro = startNew(fSpec, mro);
>
>                  mro.mapPlan.addAsLeaf(op);
>
>                  compiledInputs[0] = mro;
>
>              } else {
>
>                  int errCode = 2022;
>
>                  String msg = "Both map and reduce phases have been done.
> This is unexpected while compiling.";
>
>                  throw new PlanException(msg, errCode, PigException.BUG);
>
>              }
>
>              curMROp = mro;
>
>
>
>           ....
>
>           }
>
>
>
>       In SparkOper I created, there is only plan.
>
>       How can i deal with the situation i mentioned above? Now I use
> following ways to deal with:
>
>
> org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.visitLocalRearrange
>
>
> org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler.addToMap
>
>       private void addToMap(POLocalRearrange op) throws PlanException,
> IOException {
>
>               if (compiledInputs.length == 1) {
>
>                   SparkOper sparkOp = compiledInputs[0];
>
>                   List<PhysicalOperator> preds =
> plan.getPredecessors(op); //MyComment: It will first search the predecessor
> of POLocalRearrange,
>
>                   if( preds!=null && preds.size() >0 && preds.size()
== 1){
>
>                       if(!( preds.get(0) instanceof  POLoad)  ){     // If
> predecessor is not a poload(usually the precessor of polocalrearrange is
> poload when using "group", "join")
>
>                           FileSpec fSpec = getTempFileSpec();        //it
> will add a POStore in sparkOper.plan and store the sparkOper result in a
> tmp file
>
>                           POStore st = getStore();                   //
> Then create a new SparkOper which contains a poload which loads previous
> tmp file
>
>                           st.setSFile(fSpec);
>
>                           sparkOp.plan.addAsLeaf(st);
>
>                           sparkOp = startNew(fSpec, sparkOp);
>
>                           compiledInputs[0] = sparkOp;
>
>                       }
>
>                   }
>
>                   sparkOp.plan.addAsLeaf(op);
>
>                   curSparkOp = sparkOp;
>
>               } else {
>
>               }
>
>                   .....
>
>               }
>
>
>
>
>
> Can anyone tell me how tez deal with this situation, I want to reference
> something from other execution mode like mapreduce, tez.
>
>
>
>
>
> Best regards
>
> Zhang,Liyun
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message