crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Everett Anderson <ever...@nuna.com>
Subject Re: Hadoop InputFormat/RecordReducer and Writable reuse
Date Wed, 06 Jul 2016 17:07:24 GMT
On Tue, Jul 5, 2016 at 10:06 PM, Josh Wills <josh.wills@gmail.com> wrote:

> That _may_ cause problems in Crunch if the DoFns that are processing those
> ByteBuffers don't convert them to an immutable data type, or if they need
> to cache some of those values along the way during processing (if there
> isn't any caching in the flow, it's not normally an issue to process the
> records one-at-a-time w/no deep copy necessary.) The best practice in
> Crunch for those situations (i.e., non-immutable data + some sort of
> caching/maintenance of state) is to use the PType.getDetachedValue(obj)
> function to do a deep copy of the value in a way that is independent of the
> underlying data type (where you can be clever for PTypes of immutable
> objects and just return the value itself.)
>

If I understand correctly, for non-immutable types, the recommendation is
to be aware and add either a DoFn that copies the data right after read()
or use a PType that does this. Is that right?

For example (though if you were using Text, you should probably just use
Writables.strings() to avoid this), this won't work:

    TableSource<LongWritable, Text> source =
        From.formattedFile("/path/to/textfile",
                           TextInputFormat.class,
                           Writables.writables(LongWritable.class),
                           Writables.writables(Text.class));

    PTable<LongWritable, Text> data = getPipeline().read(source);

    for (Pair<LongWritable, Text> pair : data.materialize()) {
      System.out.println("offset: " + pair.first() +
                         ", text: " + pair.second().toString());
    }

but you could add something like this right after read() to force a copy
(which will work because Crunch would do this before a serialization
boundary) --

    final PType<Pair<LongWritable, Text>> pType = data.getPType();
    pType.initialize(getPipeline().getConfiguration());

    data = data.parallelDo(
        new DoFn<Pair<LongWritable, Text>, Pair<LongWritable, Text>>() {
          public void process(Pair<LongWritable, Text> input,
                              Emitter<Pair<LongWritable, Text>> emitter) {
            emitter.emit(pType.getDetachedValue(input));
          }
        }, data.getPTableType());

 or you could create your own PTypes to give to From.formattedFile that
make copies.


> On Tue, Jul 5, 2016 at 3:39 PM, Everett Anderson <everett@nuna.com> wrote:
>
>> Hey,
>>
>> I recently implemented a Hadoop InputFormat that returns the raw bytes of
>> each record as a BytesWritable rather than as Text (as in TextInputFormat,
>> which assumes that the input is UTF-8).
>>
>> One thing I noticed is that Hadoop RecordReader
>> <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html>
>> implementations generally
>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178>
>> re-use
>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118>
>> the
>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214>
>> Writable instance across multiple {getCurrentKey() + getCurrentValue()}
>> calls for efficiency, though this isn't documented.
>>
>> Crunch handles this for Text because Writables.strings() uses this
>> converter:
>>
>>   private static final MapFn<Text, String> TEXT_TO_STRING = new
>> MapFn<Text, String>() {
>>     @Override
>>     public String map(Text input) {
>>       return input.toString();
>>     }
>>   };
>>
>> and toString() will create a copy of Text's data.
>>
>> However, here is its corresponding map implementation for
>> Writables.bytes():
>>
>>   private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new
>> MapFn<BytesWritable, ByteBuffer>() {
>>     @Override
>>     public ByteBuffer map(BytesWritable input) {
>>       return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
>>     }
>>   };
>>
>> since ByteBuffer.wrap() will still reference BytesWritable()'s internal
>> state, and the BytesWritable instance is reused across multiple records,
>> this causes problems in Crunch if the BytesWritable came from a
>> RecordReader.
>>
>> One work-around is to construct a new WritableType that uses a MapFn that
>> creates a copy of the data, and only use it when reading from a Hadoop
>> InputFormat that returns a BytesWritable.
>>
>> Is there a more general way to solve this?
>>
>
>

Mime
View raw message