crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Multiple Writes in a single pipeline.
Date Sat, 23 Aug 2014 16:23:01 GMT
So I suspect the problem is enabling debug mode w/out having a
log4j.properties setting specified-- we should probably have a threshold
that causes an exception to be thrown (even in debug mode) to kill the job
completely after a certain number of caught exceptions in the debug
handler/logger.


On Fri, Aug 22, 2014 at 5:58 PM, Danny Morgan <unluckyboy@hotmail.com>
wrote:

> Awesome, you nailed it!
>
> In Avro
>
> Visit.id = int;
> Log.parameter2 = string;
>
> I had this line in my DoFn:
>
> visit.setId(Integer.decode((String) log.getParam2()));
>
> Changing it to this fixed everything.
>
> visit.setId(Integer.decode(log.getParam2().toString()));
>
> I had turned debugging on for the pipeline, I didn't have a log4j
> properties in the classpath and everything was just spitting out to the
> console, I never saw an error or exception. Checking the MR History Server
> logs didn't turn anything up either.
>
> ???? Any idea if I should have been seeing an exception? Is there some
> magic crunch mode I can turn on to debug what was happening?
>
> Thanks Josh!
>
> Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Fri, 22 Aug 2014 15:37:39 -0700
>
> Subject: Re: Multiple Writes in a single pipeline.
> To: user@crunch.apache.org
>
> Perversely fascinating. Does it matter if you do a mix of literals and
> values from Log? Does any value from Log cause it to not be written out?
>
> I can't help but wonder if we're swallowing an exception somewhere--
> there's nothing in the logs, right?
>
> J
>
>
> On Fri, Aug 22, 2014 at 12:08 PM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Sorry deleted a line by accident but there should be a
>
> //v.setDatehour(l2.getDatehour);
>
> in "This doesn't work" block of code.
>
> ------------------------------
> From: unluckyboy@hotmail.com
> To: user@crunch.apache.org
> Subject: RE: Multiple Writes in a single pipeline.
> Date: Fri, 22 Aug 2014 19:03:37 +0000
>
>
> Had to do a lot of bisecting to finally get to where the issue seems to
> be, going to take some time to get a separate junk of code that I can share
> out but let me try to talk through the problem, maybe it's obvious.
>
> I have raw text data that gets parsed into Log avro classes, which then go
> through another DoFn to get converted into Visit avros classes and I want
> to write out both collections of Log and Visit records. Basically the issue
> seems to happen in the DoFn that takes in Log and creates Visit. If I try
> to set any of the fields in Visit using fields from Log then I end up
> writing out no files, if I set the fields in Visit with literals then I
> have output.
>
> Here's the code, I thought this might be a detached values issue and tried
> that as well but it didn't seem to help. Originally the code just used "l"
> directly in process() and I didn't bother with "l2".
>
> public static PTable<Visit, Pair<Long, Long>> parseVisit(PCollection<Log>
> logs) {
>       return logs.parallelDo("second-parse", new DoFn<Log,
> Pair<Visit,Pair<Long, Long>>>() {
>         private PType<Log> ptype;
>         @Override
>         public void initialize() {
>           this.ptype = Avros.specifics(Log.class);
>           this.ptype.initialize(getConfiguration());
>         }
>
> @Override
>
>          public void process(Log l, Emitter<Pair<Visit, Pair<Long,Long>>>
> emitter) {
>              Log l2 = ptype.getDetachedValue(l);
>
> increment("test","visitcount");
>
>              // This works
>              Visit v = new
> Visit();
>
>              v.setDate(1234);
>              v.setDatehour("123");
>              v.setUser("123");
>              emitter.emit(Pair.of(v, Pair.of(1L, 1L)));
>
>              // This doesn't work
>              //Visit v = new Visit();
>              //v.setDate(l2.getDate());
>              //v.setUser(l2.getUser());
>              //emitter.emit(Pair.of(v, Pair.of(1L, 1L)));
>
>
> }
>
>       }, Avros.tableOf(Avros.specifics(Visit.class),
> Avros.pairs(Avros.longs(), Avros.longs())));
> }
>
> Thanks!
>
>
>
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Fri, 22 Aug 2014 08:57:13 -0700
> Subject: Re: Multiple Writes in a single pipeline.
> To: user@crunch.apache.org
>
> Many thanks for taking the time, would like to get this resolved before
> the next release.
>
> J
>
>
> On Fri, Aug 22, 2014 at 8:55 AM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> Hi Josh,
>
> The Log and Visits classes are all in the same jar, the classloader fix is
> in place but I still get the issue without setting the class loader.
>
> I'll put together the smallest reproduction I can and send out the code.
>
> Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Fri, 22 Aug 2014 08:42:06 -0700
>
> Subject: Re: Multiple Writes in a single pipeline.
> To: user@crunch.apache.org
>
> Hey Danny,
>
> I wrote the test that I inlined below and it ran successfully for me
> against master and the 0.10 branch, so there must be something more subtle
> going on here-- are the Log and Visit classes created in different jars?
> I'm assuming the classloader fix is in play here and I'm wondering if there
> is something weird there.
>
> import org.apache.crunch.MapFn;
> import org.apache.crunch.PCollection;
> import org.apache.crunch.Pipeline;
> import org.apache.crunch.impl.mr.MRPipeline;
> import org.apache.crunch.io.From;
> import org.apache.crunch.io.To;
> import org.apache.crunch.test.Employee;
> import org.apache.crunch.test.Person;
> import org.apache.crunch.test.TemporaryPath;
> import org.apache.crunch.test.TemporaryPaths;
> import org.apache.crunch.types.avro.Avros;
> import org.apache.hadoop.fs.FileStatus;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.junit.Rule;
> import org.junit.Test;
>
> public class MultiAvroOutputIT {
>
>   @Rule
>   public transient TemporaryPath tmpDir = TemporaryPaths.create();
>
>   @Test
>   public void testMultiAvroWrite() throws Exception {
>     Path person = tmpDir.getPath("person");
>     Path employee = tmpDir.getPath("employee");
>     Pipeline p = new MRPipeline(MultiAvroOutputIT.class,
> tmpDir.getDefaultConfiguration());
>     PCollection<String> shakes =
> p.read(From.textFile(tmpDir.copyResourcePath("shakes.txt")));
>
>     shakes.parallelDo(new PersonFn(), Avros.specifics(Person.class))
>         .write(To.avroFile(person));
>     shakes.parallelDo(new EmployeeFn(), Avros.specifics(Employee.class))
>         .write(To.avroFile(employee));
>     p.run();
>
>     FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
>     System.out.println("Person");
>     for (FileStatus fstat : fs.listStatus(person)) {
>       System.out.println(fstat.getPath() + ": " + fstat.getLen());
>     }
>     System.out.println("Employee");
>     for (FileStatus fstat : fs.listStatus(employee)) {
>       System.out.println(fstat.getPath() + ": " + fstat.getLen());
>     }
>
>     p.done();
>   }
>
>   static class PersonFn extends MapFn<String, Person> {
>     @Override
>     public Person map(String input) {
>       return new Person();
>     }
>   }
>
>   static class EmployeeFn extends MapFn<String, Employee> {
>     @Override
>     public Employee map(String input) {
>       return new Employee();
>     }
>    }
>
> }
>
>
> On Fri, Aug 22, 2014 at 8:12 AM, Josh Wills <jwills@cloudera.com> wrote:
>
> That is super-interesting; let me try to replicate it in a test.
>
> J
>
>
> On Fri, Aug 22, 2014 at 7:26 AM, Danny Morgan <unluckyboy@hotmail.com>
> wrote:
>
> This issue looks similar to
> https://issues.apache.org/jira/browse/CRUNCH-67
>
> It turns out even if I get rid of the reduce phase and do just this:
>
>
>  PTable<String, String> lines = this.read(mySource);
>  PCollection<Log> parsed = lines.parallelDo("initial-
> parsing", new myParser(), Avros.specifics(Log.class));
>
>  PTable<Visit, Pair<Long, Long>> visits = parsed.parallelDo("visits-parsing",
> new VisitsExtractor(),
>           Avros.tableOf(Avros.specifics(Visit.class),
> Avros.pairs(Avros.longs(), Avros.longs())));
>
> visits.write(To.avroFile(outputPath+"/visits"), WriteMode.OVERWRITE);
> parsed.write(To.avroFile(outputPath+"/raw"), WriteMode.OVERWRITE);
> this.done();
>
> The plan shows I should be writing to two different targets in a single
> map phase however only "/raw" as data written out to it and "/visits" just
> contains a _SUCCESS file and no data.
>
> Might this be an issue writing out to two different Avro types in the same
> phase?
>
> Thanks Again,
>
> Danny
>
>
> ------------------------------
> From: unluckyboy@hotmail.com
> To: user@crunch.apache.org
> Subject: RE: Multiple Writes in a single pipeline.
> Date: Fri, 22 Aug 2014 02:02:20 +0000
>
>
> Hi Josh,
>
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Thu, 21 Aug 2014 17:40:25 -0700
> Subject: Re: Multiple Writes in a single pipeline.
> To: user@crunch.apache.org
>
> The two different executions you have are doing different things, however.
> In the first one, Crunch is running a single MapReduce job where the /raw
> directory is written as a mapper side-output, and the /visits directory is
> being written out on the reduce side (or at least, should be-- is there any
> evidence of a failure in the job in the logs? Are bytes being written out
> from the reducer?)
>
> No evidence of any failures in the logs, the single mapper and reducers
> both succeed. The mapper definitely writes to HDFS the reducer does not,
> here are the relevant counters from the reducer:
>
> FILE: Number of bytes read
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_READ>
> 6 FILE: Number of bytes written
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_BYTES_WRITTEN>
> 91811 FILE: Number of large read operations
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_LARGE_READ_OPS>
> 0FILE: Number of read operations
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_READ_OPS>
> 0 FILE: Number of write operations
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/FILE_WRITE_OPS>
> 0 HDFS: Number of bytes read
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_READ>
> 6205 HDFS: Number of bytes written
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_BYTES_WRITTEN>
> 0 HDFS: Number of large read operations
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_LARGE_READ_OPS>
> 0 HDFS: Number of read operations
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_READ_OPS>
> 4HDFS: Number of write operations
> <http://ec2-54-166-194-165.compute-1.amazonaws.com:19888/jobhistory/singletaskcounter/task_1408490264848_0012_r_000000/org.apache.hadoop.mapreduce.FileSystemCounter/HDFS_WRITE_OPS>
> 2
> I couldn't find anything related on the crunch jira.
>
> For this problem, I think it would be more efficient to write the parsed
> -> /raw output first, call run(), then do the agg -> /visits output
> followed by done(), which would mean that you would only need to parse the
> raw input once, instead of twice.
>
> Would the first option be more efficient if it worked?
>
> A helpful trick for seeing how the Crunch planner is mapping your logic
> into MapReduce jobs is to look at the plan dot file via one of the
> following mechanisms:
>
> 1) Instead of calling Pipeline.run(), call Pipeline.runAsync() and then
> call the getPlanDotFile() method on the returned PipelineExecution object.
> You can print the dot file to a file and use a dot file viewer to look at
> how the DoFns are broken up into MR jobs and map/reduce phases.
> 2) Call MRPipeline.plan() directly, which returns a MRExecutor object that
> also implements PipelineExecution. (The difference being that calling
> MRPipeline.plan will not start the jobs running, whereas calling runAsync
> will.)
>
> I ran the two different version through dot and you're right they are two
> complete different executions, pretty cool!
>
> Thanks!
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message