crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ben Roling (JIRA)" <j...@apache.org>
Subject [jira] [Created] (CRUNCH-316) Data Corruption when DatumWriter.write() throws MapBufferTooSmallException when called by SafeAvroSerialization
Date Tue, 31 Dec 2013 17:03:51 GMT
Ben Roling created CRUNCH-316:
---------------------------------

             Summary: Data Corruption when DatumWriter.write() throws MapBufferTooSmallException
when called by SafeAvroSerialization
                 Key: CRUNCH-316
                 URL: https://issues.apache.org/jira/browse/CRUNCH-316
             Project: Crunch
          Issue Type: Bug
          Components: Core
            Reporter: Ben Roling
            Assignee: Josh Wills


Recently we encountered an issue when processing a crunch join with a large Avro record. 
The job was failing in the reduce phase with the attached ArrayIndexOutOfBoundsException deserializing
an Avro record.

One of the first things I noticed when looking into the problem was the following message:

2013-12-31 10:33:02,489 INFO  [pool-1-thread-1] org.apache.hadoop.mapred.MapTask Record too
large for in-memory buffer: 99615133 bytes

The message indicates a record is too large to fit in the sort buffer (per io.sort.mb -- which
defaults to 100MB).  I increased io.sort.mb and the problem went away, but I was curious to
figure out the root cause of the issue.

After some lengthy debugging, I was able to figure out that the problem is in SafeAvroSerialization.
 When a record is too small to fit in the sort buffer, org.apache.hadoop.mapred.MapTask$MapOutputBuffer$Buffer.write()
throws MapBufferTooSmallException.  This exception is handled in MapTask.collect() by spilling
the record to disk.  The problem is that the BufferedBinaryEncoder used by SafeAvroSerialization
is never flushed and as a result corruption occurs when the next record is processed due to
data still in the buffer from the previous record getting flushed into the new record.

I was able to prove further to myself that this was the problem by leaving io.sort.mb at default
and modifying SafeAvroSerialization to use a DirectBinaryEncoder instead of a BufferedBinaryEncoder.

It could be argued that the problem is actually in MapTask with the way it is handling the
exception.  Perhaps it should discard the key and value serializers and get new ones when
handling this exception.  Doing that would acknowledge that the Serializers might be stateful
like SafeAvroSerialization.  I don't see any documentation that suggests they must be stateless.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Mime
View raw message