crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Non Deterministic Record Drops
Date Tue, 28 Jul 2015 01:49:59 GMT
That was a deeply satisfying bug. Fix is up here:
https://issues.apache.org/jira/browse/CRUNCH-553

On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <jeff@nuna.com> wrote:

> Wow, thanks so much for looking into it. That minimal example
> seems accurate. Previously when we dug deeper into which records were
> dropped it appeared entire files were being dropped, not just parts of one
> file, so that sounds consistent with what you are seeing.
>
> On Monday, July 27, 2015, Josh Wills <jwills@cloudera.com> wrote:
>
>> Hey Jeff,
>>
>> Okay cool-- I think I've managed to create a simple test that replicates
>> the behavior you're seeing. I can run this test a few different times, and
>> sometimes I'll get the correct output, but other times I'll get an error
>> b/c no records are processed. I'm going to investigate further and see if I
>> can identify the source of the randomness.
>>
>> public class RecordDropIT {
>>   @Rule
>>   public TemporaryPath tmpDir = TemporaryPaths.create();
>>
>>   @Test
>>   public void testMultiReadCount() throws Exception {
>>     int numReads = 2;
>>     MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
>>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class,
LongWritable.class, Text.class);
>>     List<Iterable<Integer>> values = Lists.newArrayList();
>>     for (int i = 0; i < numReads; i++) {
>>       PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable,
Text>>(), Writables.ints());
>>       values.add(cnt.materialize());
>>     }
>>     for (Iterable<Integer> iter : values) {
>>       System.out.println(Iterables.getOnlyElement(iter));
>>     }
>>     p.done();
>>   }
>>
>>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>>
>>     private int count = 0;
>>
>>     @Override
>>     public void process(T input, Emitter<Integer> emitter) {
>>       count++;
>>     }
>>
>>     @Override
>>     public void cleanup(Emitter<Integer> emitter) {
>>       emitter.emit(count);
>>     }
>>   }
>> }
>>
>>
>> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <jeff@nuna.com> wrote:
>>
>>> Hi Josh,
>>>
>>> Thanks so much for your suggestions.
>>>
>>> The counts are determined with two methods, I am using a simple pig
>>> script to count records, and I am also tabulating up the size in bytes of
>>> all hdfs output files. Both measures show dropped records / fewer than
>>> expected output bytes.
>>>
>>> To your second point I will go back and do a sweep for that, but I am
>>> fairly sure no DoFns are making use of intermediate state values without
>>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>>> think it has bitten us before.
>>>
>>> Thanks !
>>>
>>> Jeff
>>>
>>>
>>> On Monday, July 27, 2015, Josh Wills <jwills@cloudera.com> wrote:
>>>
>>>> One more thought-- are any of these DoFns keeping records around as
>>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>>> of them?
>>>>
>>>> J
>>>>
>>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jwills@cloudera.com>
>>>> wrote:
>>>>
>>>>> Hey Jeff,
>>>>>
>>>>> Are the counts determined by Counters? Or is it the length of the
>>>>> output files? Or both?
>>>>>
>>>>> J
>>>>>
>>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dpo5003@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Out of curiosity, any reason you went with multiple reads as opposed
>>>>>> to just performing multiple operations on the same PTable? parallelDo
>>>>>> returns a new object rather than modifying the initial one, so a
single
>>>>>> collection can start multiple execution flows.
>>>>>>
>>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <jeff@nuna.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> We have observed and replicated strange behavior with our crunch
>>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>>> service. Running a very simple job which is mostly map only,
we see that an
>>>>>>> undetermined subset of records are getting dropped. Specifically,
we
>>>>>>> expect 30,136,686 output records and have seen output on different
trials
>>>>>>> (running over the same data with the same binary):
>>>>>>>
>>>>>>> 22,177,119 records
>>>>>>> 26,435,670 records
>>>>>>> 22,362,986 records
>>>>>>> 29,798,528 records
>>>>>>>
>>>>>>> These are all the things about our application which might be
>>>>>>> unusual and relevant:
>>>>>>>
>>>>>>> - We use a custom file input format, via From.formattedFile.
It
>>>>>>> looks like this (basically a carbon copy
>>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>>
>>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>>> import org.apache.hadoop.io.Text;
>>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>>
>>>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable,
Text> {
>>>>>>>
>>>>>>>   @Override
>>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>>       InterruptedException {
>>>>>>>     return new LineRecordReader();
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat
many times, for the job in question it is called ~160 times as the input is ~100 different
files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all
input files.
>>>>>>>
>>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice
one the same file, and the resulting PTables are processed in different ways.
>>>>>>>
>>>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read
has been called on more than once during a job that have dropped records, all other files
consistently do not have dropped records
>>>>>>>
>>>>>>> Curious if any Crunch users have experienced similar behavior
before, or if any of these details about my job raise any red flags.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> Jeff Quinn
>>>>>>>
>>>>>>> Data Engineer
>>>>>>>
>>>>>>> Nuna
>>>>>>>
>>>>>>>
>>>>>>> *DISCLAIMER:* The contents of this email, including any
>>>>>>> attachments, may contain information that is confidential, proprietary
in
>>>>>>> nature, protected health information (PHI), or otherwise protected
by law
>>>>>>> from disclosure, and is solely for the use of the intended recipient(s).
If
>>>>>>> you are not the intended recipient, you are hereby notified that
any use,
>>>>>>> disclosure or copying of this email, including any attachments,
is
>>>>>>> unauthorized and strictly prohibited. If you have received this
email in
>>>>>>> error, please notify the sender of this email. Please delete
this and all
>>>>>>> copies of this email from your system. Any opinions either expressed
or
>>>>>>> implied in this email and all attachments, are those of its author
only,
>>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message