crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: Hadoop InputFormat/RecordReducer and Writable reuse
Date Wed, 06 Jul 2016 18:17:00 GMT
That is correct, yes.

On Wed, Jul 6, 2016 at 10:07 AM, Everett Anderson <everett@nuna.com> wrote:

>
>
> On Tue, Jul 5, 2016 at 10:06 PM, Josh Wills <josh.wills@gmail.com> wrote:
>
>> That _may_ cause problems in Crunch if the DoFns that are processing
>> those ByteBuffers don't convert them to an immutable data type, or if they
>> need to cache some of those values along the way during processing (if
>> there isn't any caching in the flow, it's not normally an issue to process
>> the records one-at-a-time w/no deep copy necessary.) The best practice in
>> Crunch for those situations (i.e., non-immutable data + some sort of
>> caching/maintenance of state) is to use the PType.getDetachedValue(obj)
>> function to do a deep copy of the value in a way that is independent of the
>> underlying data type (where you can be clever for PTypes of immutable
>> objects and just return the value itself.)
>>
>
> If I understand correctly, for non-immutable types, the recommendation is
> to be aware and add either a DoFn that copies the data right after read()
> or use a PType that does this. Is that right?
>
> For example (though if you were using Text, you should probably just use
> Writables.strings() to avoid this), this won't work:
>
>     TableSource<LongWritable, Text> source =
>         From.formattedFile("/path/to/textfile",
>                            TextInputFormat.class,
>                            Writables.writables(LongWritable.class),
>                            Writables.writables(Text.class));
>
>     PTable<LongWritable, Text> data = getPipeline().read(source);
>
>     for (Pair<LongWritable, Text> pair : data.materialize()) {
>       System.out.println("offset: " + pair.first() +
>                          ", text: " + pair.second().toString());
>     }
>
> but you could add something like this right after read() to force a copy
> (which will work because Crunch would do this before a serialization
> boundary) --
>
>     final PType<Pair<LongWritable, Text>> pType = data.getPType();
>     pType.initialize(getPipeline().getConfiguration());
>
>     data = data.parallelDo(
>         new DoFn<Pair<LongWritable, Text>, Pair<LongWritable, Text>>()
{
>           public void process(Pair<LongWritable, Text> input,
>                               Emitter<Pair<LongWritable, Text>> emitter)
{
>             emitter.emit(pType.getDetachedValue(input));
>           }
>         }, data.getPTableType());
>
>  or you could create your own PTypes to give to From.formattedFile that
> make copies.
>
>
>> On Tue, Jul 5, 2016 at 3:39 PM, Everett Anderson <everett@nuna.com>
>> wrote:
>>
>>> Hey,
>>>
>>> I recently implemented a Hadoop InputFormat that returns the raw bytes
>>> of each record as a BytesWritable rather than as Text (as in
>>> TextInputFormat, which assumes that the input is UTF-8).
>>>
>>> One thing I noticed is that Hadoop RecordReader
>>> <https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapreduce/RecordReader.html>
>>> implementations generally
>>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java#L178>
>>> re-use
>>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java#L118>
>>> the
>>> <https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java#L214>
>>> Writable instance across multiple {getCurrentKey() + getCurrentValue()}
>>> calls for efficiency, though this isn't documented.
>>>
>>> Crunch handles this for Text because Writables.strings() uses this
>>> converter:
>>>
>>>   private static final MapFn<Text, String> TEXT_TO_STRING = new
>>> MapFn<Text, String>() {
>>>     @Override
>>>     public String map(Text input) {
>>>       return input.toString();
>>>     }
>>>   };
>>>
>>> and toString() will create a copy of Text's data.
>>>
>>> However, here is its corresponding map implementation for
>>> Writables.bytes():
>>>
>>>   private static final MapFn<BytesWritable, ByteBuffer> BW_TO_BB = new
>>> MapFn<BytesWritable, ByteBuffer>() {
>>>     @Override
>>>     public ByteBuffer map(BytesWritable input) {
>>>       return ByteBuffer.wrap(input.getBytes(), 0, input.getLength());
>>>     }
>>>   };
>>>
>>> since ByteBuffer.wrap() will still reference BytesWritable()'s internal
>>> state, and the BytesWritable instance is reused across multiple records,
>>> this causes problems in Crunch if the BytesWritable came from a
>>> RecordReader.
>>>
>>> One work-around is to construct a new WritableType that uses a MapFn
>>> that creates a copy of the data, and only use it when reading from a Hadoop
>>> InputFormat that returns a BytesWritable.
>>>
>>> Is there a more general way to solve this?
>>>
>>
>>
>

Mime
View raw message