crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Danny Morgan <unlucky...@hotmail.com>
Subject RE: Multiple Writes in a single pipeline.
Date Sat, 23 Aug 2014 20:21:27 GMT
Any chance you have a sample log4j.properties for working with crunch that you can share?
I want to investigate why the previous version would work when I would call pipeline.run()
after the first write but before the second.
Danny
From: jwills@cloudera.com
Date: Sat, 23 Aug 2014 09:23:01 -0700
Subject: Re: Multiple Writes in a single pipeline.
To: user@crunch.apache.org

So I suspect the problem is enabling debug mode w/out having a log4j.properties setting specified--
we should probably have a threshold that causes an exception to be thrown (even in debug mode)
to kill the job completely after a certain number of caught exceptions in the debug handler/logger.



On Fri, Aug 22, 2014 at 5:58 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Awesome, you nailed it!

In Avro

Visit.id = int;
Log.parameter2 = string;

I had this line in my DoFn:

visit.setId(Integer.decode((String) log.getParam2()));

Changing it to this fixed everything.



visit.setId(Integer.decode(log.getParam2().toString()));

I had turned debugging on for the pipeline, I didn't have a log4j properties in the classpath
and everything was just spitting out to the console, I never saw an error or exception. Checking
the MR History Server logs didn't turn anything up either.



???? Any idea if I should have been seeing an exception? Is there some magic crunch mode I
can turn on to debug what was happening?

Thanks Josh!

Danny

From: jwills@cloudera.com


Date: Fri, 22 Aug 2014 15:37:39 -0700
Subject: Re: Multiple Writes in a single pipeline.
To: user@crunch.apache.org



Perversely fascinating. Does it matter if you do a mix of literals and values from Log? Does
any value from Log cause it to not be written out?
I can't help but wonder if we're swallowing an exception somewhere-- there's nothing in the
logs, right?




J

On Fri, Aug 22, 2014 at 12:08 PM, Danny Morgan <unluckyboy@hotmail.com> wrote:







Sorry deleted a line by accident but there should be a

//v.setDatehour(l2.getDatehour);

in "This doesn't work" block of code.

From: unluckyboy@hotmail.com




To: user@crunch.apache.org
Subject: RE: Multiple Writes in a single pipeline.
Date: Fri, 22 Aug 2014 19:03:37 +0000




Had to do a lot of bisecting to finally get to where the issue seems to be, going to take
some time to get a separate junk of code that I can share out but let me try to talk through
the problem, maybe it's obvious.





I have raw text data that gets parsed into Log avro classes, which then go through another
DoFn to get converted into Visit avros classes and I want to write out both collections of
Log and Visit records. Basically the issue seems to happen in the DoFn that takes in Log and
creates Visit. If I try to set any of the fields in Visit using fields from Log then I end
up writing out no files, if I set the fields in Visit with literals then I have output.





Here's the code, I thought this might be a detached values issue and tried that as well but
it didn't seem to help. Originally the code just used "l" directly in process() and I didn't
bother with "l2".





public static PTable<Visit, Pair<Long, Long>> parseVisit(PCollection<Log>
logs) { 
      return logs.parallelDo("second-parse", new DoFn<Log, Pair<Visit,Pair<Long,
Long>>>() {




        private PType<Log> ptype;
        @Override
        public void initialize() {
          this.ptype = Avros.specifics(Log.class);
          this.ptype.initialize(getConfiguration());
        }




         @Override                                                                       
                                                                        
         public void process(Log l, Emitter<Pair<Visit, Pair<Long,Long>>>
emitter) { 




             Log l2 = ptype.getDetachedValue(l);
             increment("test","visitcount");                                             
                                                                                    




             // This works
             Visit v = new Visit();                                                      
                                                                                         
                                                        




             v.setDate(1234);
             v.setDatehour("123");
             v.setUser("123");
             emitter.emit(Pair.of(v, Pair.of(1L, 1L)));

             // This doesn't work




             //Visit v = new Visit();
             //v.setDate(l2.getDate());
             //v.setUser(l2.getUser());
             //emitter.emit(Pair.of(v, Pair.of(1L, 1L)));

           }                                                                             
                                                                                         
                                                                                         
  




      }, Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));
  
}

Thanks! 




From: josh.wills@gmail.com




Date: Fri, 22 Aug 2014 08:57:13 -0700
Subject: Re: Multiple Writes in a single pipeline.
To: user@crunch.apache.org

Many thanks for taking the time, would like to get this resolved before the next release.




J

On Fri, Aug 22, 2014 at 8:55 AM, Danny Morgan <unluckyboy@hotmail.com> wrote:





Hi Josh,

The Log and Visits classes are all in the same jar, the classloader fix is in place but I
still get the issue without setting the class loader.

I'll put together the smallest reproduction I can and send out the code.







Danny

From: jwills@cloudera.com
Date: Fri, 22 Aug 2014 08:42:06 -0700
Subject: Re: Multiple Writes in a single pipeline.


To: user@crunch.apache.org

Hey Danny,
I wrote the test that I inlined below and it ran successfully for me against master and the
0.10 branch, so there must be something more subtle going on here-- are the Log and Visit
classes created in different jars? I'm assuming the classloader fix is in play here and I'm
wondering if there is something weird there.









import org.apache.crunch.MapFn;import org.apache.crunch.PCollection;import org.apache.crunch.Pipeline;import
org.apache.crunch.impl.mr.MRPipeline;import org.apache.crunch.io.From;








import org.apache.crunch.io.To;import org.apache.crunch.test.Employee;import org.apache.crunch.test.Person;import
org.apache.crunch.test.TemporaryPath;








import org.apache.crunch.test.TemporaryPaths;import org.apache.crunch.types.avro.Avros;import
org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;








import org.junit.Rule;import org.junit.Test;
public class MultiAvroOutputIT {
  @Rule  public transient TemporaryPath tmpDir = TemporaryPaths.create();









  @Test  public void testMultiAvroWrite() throws Exception {    Path person = tmpDir.getPath("person");
   Path employee = tmpDir.getPath("employee");








    Pipeline p = new MRPipeline(MultiAvroOutputIT.class, tmpDir.getDefaultConfiguration());
   PCollection<String> shakes = p.read(From.textFile(tmpDir.copyResourcePath("shakes.txt")));









    shakes.parallelDo(new PersonFn(), Avros.specifics(Person.class))        .write(To.avroFile(person));
   shakes.parallelDo(new EmployeeFn(), Avros.specifics(Employee.class))








        .write(To.avroFile(employee));    p.run();
    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());    System.out.println("Person");








    for (FileStatus fstat : fs.listStatus(person)) {      System.out.println(fstat.getPath()
+ ": " + fstat.getLen());    }    System.out.println("Employee");








    for (FileStatus fstat : fs.listStatus(employee)) {      System.out.println(fstat.getPath()
+ ": " + fstat.getLen());    }
    p.done();  }









  static class PersonFn extends MapFn<String, Person> {    @Override    public Person
map(String input) {      return new Person();    }  }









  static class EmployeeFn extends MapFn<String, Employee> {    @Override    public Employee
map(String input) {      return new Employee();    }








  }
}

On Fri, Aug 22, 2014 at 8:12 AM, Josh Wills <jwills@cloudera.com> wrote:









That is super-interesting; let me try to replicate it in a test.

J

On Fri, Aug 22, 2014 at 7:26 AM, Danny Morgan <unluckyboy@hotmail.com> wrote:













This issue looks similar to https://issues.apache.org/jira/browse/CRUNCH-67

It turns out even if I get rid of the reduce phase and do just this:











 PTable<String, String> lines = this.read(mySource);
 PCollection<Log> parsed = lines.parallelDo("initial-parsing", new myParser(), Avros.specifics(Log.class));











 PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
new VisitsExtractor(),
          Avros.tableOf(Avros.specifics(Visit.class), Avros.pairs(Avros.longs(), Avros.longs())));











visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
this.done();

The plan shows I should be writing to two different targets in a single map phase however
only "/raw" as data written out to it and "/visits" just contains a _SUCCESS file and no data.











Might this be an issue writing out to two different Avro types in the same phase?

Thanks Again,

Danny


From: unluckyboy@hotmail.com










To: user@crunch.apache.org
Subject: RE: Multiple Writes in a single pipeline.
Date: Fri, 22 Aug 2014 02:02:20 +0000




Hi Josh,


From: jwills@cloudera.com
Date: Thu, 21 Aug 2014 17:40:25 -0700
Subject: Re: Multiple Writes in a single pipeline.










To: user@crunch.apache.org
The two different executions you have are doing different things, however. In the first one,
Crunch is running a single MapReduce job where the /raw directory is written as a mapper side-output,
and the /visits directory is being written out on the reduce side (or at least, should be--
is there any evidence of a failure in the job in the logs? Are bytes being written out from
the reducer?)












No evidence of any failures in the logs, the single mapper and reducers both succeed. The
mapper definitely writes to HDFS the reducer does not, here are the relevant counters from
the reducer:











FILE: Number of bytes read









6









FILE: Number of bytes written









91811









FILE: Number of large read operations









0FILE: Number of read operations









0









FILE: Number of write operations









0








HDFS: Number of bytes read









6205









HDFS: Number of bytes written









0









HDFS: Number of large read operations









0









HDFS: Number of read operations









4HDFS: Number of write operations









2
I couldn't find anything related on the crunch jira.

For this problem, I think it would be more efficient to write the parsed -> /raw output
first, call run(), then do the agg -> /visits output followed by done(), which would mean
that you would only need to parse the raw input once, instead of twice.











Would the first option be more efficient if it worked?



A helpful trick for seeing how the Crunch planner is mapping your logic into MapReduce jobs
is to look at the plan dot file via one of the following mechanisms:
1) Instead of calling Pipeline.run(), call Pipeline.runAsync() and then call the getPlanDotFile()
method on the returned PipelineExecution object. You can print the dot file to a file and
use a dot file viewer to look at how the DoFns are broken up into MR jobs and map/reduce phases.











2) Call MRPipeline.plan() directly, which returns a MRExecutor object that also implements
PipelineExecution. (The difference being that calling MRPipeline.plan will not start the jobs
running, whereas calling runAsync will.)










I ran the two different version through dot and you're right they are two complete different
executions, pretty cool!

Thanks!

 		 	   		   		 	   		  











-- 
Director of Data ScienceClouderaTwitter: @josh_wills













-- 
Director of Data ScienceClouderaTwitter: @josh_wills









 		 	   		  

 		 	   		   		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills




 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills


 		 	   		  
Mime
View raw message