crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Non Deterministic Record Drops
Date Tue, 28 Jul 2015 01:17:56 GMT
Hey Jeff,

Okay cool-- I think I've managed to create a simple test that replicates
the behavior you're seeing. I can run this test a few different times, and
sometimes I'll get the correct output, but other times I'll get an error
b/c no records are processed. I'm going to investigate further and see if I
can identify the source of the randomness.

public class RecordDropIT {
  @Rule
  public TemporaryPath tmpDir = TemporaryPaths.create();

  @Test
  public void testMultiReadCount() throws Exception {
    int numReads = 2;
    MRPipeline p = new MRPipeline(RecordDropIT.class,
tmpDir.getDefaultConfiguration());
    Path shakes = tmpDir.copyResourcePath("shakes.txt");
    TableSource<LongWritable, Text> src = From.formattedFile(shakes,
TextInputFormat.class, LongWritable.class, Text.class);
    List<Iterable<Integer>> values = Lists.newArrayList();
    for (int i = 0; i < numReads; i++) {
      PCollection<Integer> cnt = p.read(src).parallelDo(new
LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
      values.add(cnt.materialize());
    }
    for (Iterable<Integer> iter : values) {
      System.out.println(Iterables.getOnlyElement(iter));
    }
    p.done();
  }

  public static class LineCountFn<T> extends DoFn<T, Integer> {

    private int count = 0;

    @Override
    public void process(T input, Emitter<Integer> emitter) {
      count++;
    }

    @Override
    public void cleanup(Emitter<Integer> emitter) {
      emitter.emit(count);
    }
  }
}


On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <jeff@nuna.com> wrote:

> Hi Josh,
>
> Thanks so much for your suggestions.
>
> The counts are determined with two methods, I am using a simple pig script
> to count records, and I am also tabulating up the size in bytes of all hdfs
> output files. Both measures show dropped records / fewer than expected
> output bytes.
>
> To your second point I will go back and do a sweep for that, but I am
> fairly sure no DoFns are making use of intermediate state values without
> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
> think it has bitten us before.
>
> Thanks !
>
> Jeff
>
> On Monday, July 27, 2015, Josh Wills <jwills@cloudera.com> wrote:
>
>> One more thought-- are any of these DoFns keeping records around as
>> intermediate state values w/o using PType.getDetachedValue to make copies
>> of them?
>>
>> J
>>
>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jwills@cloudera.com> wrote:
>>
>>> Hey Jeff,
>>>
>>> Are the counts determined by Counters? Or is it the length of the output
>>> files? Or both?
>>>
>>> J
>>>
>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dpo5003@gmail.com> wrote:
>>>
>>>> Out of curiosity, any reason you went with multiple reads as opposed to
>>>> just performing multiple operations on the same PTable? parallelDo returns
>>>> a new object rather than modifying the initial one, so a single collection
>>>> can start multiple execution flows.
>>>>
>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <jeff@nuna.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have observed and replicated strange behavior with our crunch
>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>> service. Running a very simple job which is mostly map only, we see that
an
>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>> (running over the same data with the same binary):
>>>>>
>>>>> 22,177,119 records
>>>>> 26,435,670 records
>>>>> 22,362,986 records
>>>>> 29,798,528 records
>>>>>
>>>>> These are all the things about our application which might be unusual
>>>>> and relevant:
>>>>>
>>>>> - We use a custom file input format, via From.formattedFile. It looks
>>>>> like this (basically a carbon copy
>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>
>>>>> import org.apache.hadoop.io.LongWritable;
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>
>>>>> import java.io.IOException;
>>>>>
>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable,
Text> {
>>>>>
>>>>>   @Override
>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>       InterruptedException {
>>>>>     return new LineRecordReader();
>>>>>   }
>>>>> }
>>>>>
>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many
times, for the job in question it is called ~160 times as the input is ~100 different files.
Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input
files.
>>>>>
>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one
the same file, and the resulting PTables are processed in different ways.
>>>>>
>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read
has been called on more than once during a job that have dropped records, all other files
consistently do not have dropped records
>>>>>
>>>>> Curious if any Crunch users have experienced similar behavior before,
or if any of these details about my job raise any red flags.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Jeff Quinn
>>>>>
>>>>> Data Engineer
>>>>>
>>>>> Nuna
>>>>>
>>>>>
>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>> may contain information that is confidential, proprietary in nature,
>>>>> protected health information (PHI), or otherwise protected by law from
>>>>> disclosure, and is solely for the use of the intended recipient(s). If
you
>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>> disclosure or copying of this email, including any attachments, is
>>>>> unauthorized and strictly prohibited. If you have received this email
in
>>>>> error, please notify the sender of this email. Please delete this and
all
>>>>> copies of this email from your system. Any opinions either expressed
or
>>>>> implied in this email and all attachments, are those of its author only,
>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
View raw message