Return-Path: X-Original-To: apmail-pig-dev-archive@www.apache.org Delivered-To: apmail-pig-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6C1C31823D for ; Fri, 5 Feb 2016 08:48:40 +0000 (UTC) Received: (qmail 3423 invoked by uid 500); 5 Feb 2016 08:48:40 -0000 Delivered-To: apmail-pig-dev-archive@pig.apache.org Received: (qmail 3365 invoked by uid 500); 5 Feb 2016 08:48:40 -0000 Mailing-List: contact dev-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list dev@pig.apache.org Received: (qmail 3346 invoked by uid 500); 5 Feb 2016 08:48:40 -0000 Delivered-To: apmail-hadoop-pig-dev@hadoop.apache.org Received: (qmail 3342 invoked by uid 99); 5 Feb 2016 08:48:40 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Feb 2016 08:48:40 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id E27FD2C1F5C for ; Fri, 5 Feb 2016 08:48:39 +0000 (UTC) Date: Fri, 5 Feb 2016 08:48:39 +0000 (UTC) From: "liyunzhang_intel (JIRA)" To: pig-dev@hadoop.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (PIG-4243) Fix "TestStore" for Spark engine MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/PIG-4243?page=3Dcom.atlassian.j= ira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D151338= 52#comment-15133852 ]=20 liyunzhang_intel commented on PIG-4243: --------------------------------------- In https://builds.apache.org/job/Pig-spark/298/#showFailuresLink, it shows = following unit tests fail: org.apache.pig.test.TestStore.testCleanupOnFailureMultiStore org.apache.pig.test.TestStore.testCleanupOnFailure PIG-4243.patch fixes these two failures. Changes in PIG-4243.patch: 1. add "clean up for all of the stores"(call PigStorage#cleanupOnFailure)= =20 2. add some judgements to give different results in different engine mode(T= estStoreBase#testCleanupOnFailureMultiStore) Explain more about TestStoreBase#testCleanupOnFailureMultiStore The script like following: {code} A =3D load xx; store A into '1.out' using DummyStore('true','1'); -- first job should fa= il store A into '2.out' using DummyStore('false','1'); -- second job should s= uccess {code} the spark plan will be after multiquery optimization: {code} Split - scope-14 =09| | =09| a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4 =09| | =09| a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7 =09| =09|---a: Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.a= pache.pig.builtin.PigStorage) - scope-0------ {code} =20 In spark mode ,when there are two POStore in the sub plan of POSplit, onc= e the first job fails and throws exception, the second job will not be exe= cuted. FILE_SETUPJOB_CALLED( or FILE_SETUPTASK_CALLED) of second job will= not be generated. *But why FILE_SETUPJOB_CALLED(or FILE_SETUPTASK_CALLED)= of second job is generated even the second job is also not executed in mr = mode?* in MR mode: FILE_SETUPJOB_CALLED is genereated in org.apache.pig.test.TestStore.Dummy= OutputCommitter#setupJob. =09 DummyOutputCommitter#setupJob stacktrace=EF=BC=9A =09 {code} =09 DummyOutputCommitter.setupJob =09 ->org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigO= utputCommitter.setupJob(PigOutputCommitter.java:407) =09 -> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner= .java:511) =09 {code} =09=20 =09 org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutput= Committer#PigOutputCommitter =09 {code} =09 public PigOutputCommitter(TaskAttemptContext context, List mapStores, List reduceStores) throws IOException { // create and store the map and reduce output committers mapOutputCommitters =3D getCommitters(context, mapStores); // Kell= y's comment: there will be 2 mapOutputCommitters in above case and later Du= mmyOutputCommitter#setupJob will be invoked and FILE_SETUPJOB_CALLED of fi= rst store and second store will be generated before the mr job starts to c= ompute. reduceOutputCommitters =3D getCommitters(context, reduceStores); recoverySupported =3D context.getConfiguration().getBoolean(PigConf= iguration.PIG_OUTPUT_COMMITTER_RECOVERY, false); } =09 {code} =09 =20 In spark mode: DummyOutputCommitter#setupJob stacktrace =09 {code} =09=09 DummyOutputCommitter.setupJob =09 ->org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Pi= gOutputCommitter.setupJob(PigOutputCommitter.java:407) =09 =09 ->org.apache.spark.rdd.PairRDDFunctions#saveAsNewAPIHadoopDat= aset =09 {code}=09 =20 =09=09 =20 =09In spark mode, 1 store generates 1 spark job and the PigOutputCommitter= only has 1 reduceOutputCommitter for the spark job.=20 StoreConverter#configureStorer:=20 {code}=09 //Kelly's comment:We only set the location of current store as JobControl= Compiler.PIG_REDUCE_STORES even there are more than 1 POStore in the scrip= t. In spark, store is an action, 1 store generates 1 job. So in above case= , there will be two jobs and we execute jobs one by one, when first job fai= ls and second job will be stopped and FILE_SETUPJOB_CALLED(FILE_SETUPTASK_C= ALLED) of second job is not generated =09private static POStore configureStorer(JobConf jobConf, PhysicalOperator op) throws IOException { .... jobConf.set(JobControlCompiler.PIG_MAP_STORES, ObjectSerializer.serialize(Lists.newArrayList())); jobConf.set(JobControlCompiler.PIG_REDUCE_STORES, ObjectSerializer.serialize(storeLocations)); .... } =09{code} [~pallavi.rao], [~mohitsabharwal],[~kexianda]: help review PIG-4243.patch, = thanks > Fix "TestStore" for Spark engine > -------------------------------- > > Key: PIG-4243 > URL: https://issues.apache.org/jira/browse/PIG-4243 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-4243.patch, TEST-org.apache.pig.test.TestStore.t= xt > > > 1. Build spark and pig env according to PIG-4168 > 2. add TestStore to $PIG_HOME/test/spark-tests > cat $PIG_HOME/test/spark-tests > **/TestStore > 3. run unit test TestStore > ant test-spark > 4. the unit test fails > error log is attached -- This message was sent by Atlassian JIRA (v6.3.4#6332)