crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <>
Subject Re: Simulating Avro reads in MemPipeline
Date Wed, 04 Jun 2014 11:27:33 GMT
Hi David,

Yeah, the whole object reuse situation in MapReduce is quite a drag.
By the way, this isn't specific to Avro -- it's just as big of an
issue with Writables.

I'm kind of torn on the idea of building this into MemPipeline. On the
one hand, the closer the behavior of different pipeline
implementations are, the better, and I think MemPipeline is currently
indeed largely used for fast testing of MR workflows.

On the other hand, the MemPipeline can also be seen as an optimized
implementation for running a pipeline faster if everything fits in
memory, so adding in extra serialization/deserialization just to be
similar to the MRPipeline would be unfortunately. Additionally,
there's also the SparkPipeline implementation to consider -- I assume
that spark doesn't have this same object reuse situation, although I'm
not actually sure.

I think that my general feeling is that I'd rather not add object
reuse into MemPipeline, but I do think it would be great if we could
bring the behavior of the two implementations in line. Any other
thoughts on how we could do this?

- Gabriel

On Wed, Jun 4, 2014 at 11:51 AM, David Whiting <> wrote:
> We are facing some problems where errors are not found with MemPipeline
> tests which do happen on the cluster due to the way Avro reuses objects for
> reading; meaning that if you do a parallelDo and a PGroupedTable, then the
> Iterable of values you get is actually the same object returned to you with
> different contents. If you then try and store certain instances of this to
> be emitted later, then you will get unexpected results without taking a
> copy.
> To try and catch these problems on the local side, we've written a wrapper
> for Iterable<? extends SpecificRecord> which uses the SpecificDatumWriter
> and SpecificDatumReader to write to and from ByteBuffers for each iteration
> and offering the last record for reuse. This allows us to run the offending
> MapFn or DoFn in isolation with a wrapped Iterable as input to identify and
> fix the problem.
> This, however, is a little awkward and it would be much neater if this was
> driven from the Crunch side. At the point in the MapShuffler where the
> Iterable is wrapped in a SingleUseIterable, it could also be wrapped in one
> of these AvroReadSimulatingIterable, meaning that people could find these
> problems before even running them live for the first time. However, this is
> a problem that is specific to Avro so it obviously makes no sense to hack
> it right into HFunction.
> Anyone have any thoughts on how this could be integrated nicely, or indeed
> if it should be integrated at all?

View raw message