crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Controlling Avro Output file number
Date Tue, 11 Nov 2014 18:19:32 GMT
Hey Cristian,

To control the number of output files, you'll need to run an identity
reduce operation, which you can do with the
org.apache.crunch.lib.Shard.shard method, which for your example, would
look like this:

import org.apache.crunch.lib.Shard;

...

int numOutputFiles = ...;
Shard.shard(TokenizedData, numOutputFiles).write(To.avroFile(outputDir));
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;

Josh

On Tue, Nov 11, 2014 at 10:09 AM, Cristian Giha <Cristian.Giha@equifax.com>
wrote:

> Hi all,
>
> I am working with apache crunch 0.9.0 and hadoop yarn.
> I am doing a DoFn to read an Avro file and change some values of a Avro
> GenericRecord and I return it by the emitter object.
> After the DoFn Call I use the Pipeline to write the final collection as
> Avro into the HDFS.
>
> My problem is that I am processing a lot of avro files of 2 or 3 gb each
> one, but for each processed file crunch is generating a big amount of
> mappers.
> For example for 2 files of 2.5 GB approximated, crunch generate 40 map
> tasks and finally the output are 40 files in the HDFS.
>
> My Code do something like that:
>
> DoFN process code:
>
>                 @Override
>                 public void process(Record record, Emitter<Record> emitter)
>                 {
>                                 avroProtector.protect(record);
>                                 emitter.emit(record);
>                 }
>
> MAIN CODE:
>                                 // Initialize objects
>                                 PCollection<Record> avroCreditRecords =
> pipeline.read(From.avroFile(avroFile, avroObject));
>
>                                 FnTokenizeCollection toTokenizedColl = new
> FnTokenizeCollection(tokenizerSchema.toString());
>
>                                 PCollection<Record> TokenizedData =
> avroCreditRecords.parallelDo(toTokenizedColl, Avros.generics(dataSchema));
>
>
> //TokenizedData.write(To.avroFile(outputDir));
>
>                                 pipeline.write(TokenizedData,
> To.avroFile(outputDir));
>
>                                 PipelineResult result = pipeline.done();
>
>                                 return result.succeeded() ? 0 : 1;
>
>
>
> Can someone help me with that?
> Regardss
>
> Cristian Giha SepĂșlveda | Development engineer intermediate | Data &
> Analytic Team
> Office: +1 866 2  444 72 19 | cristian.giha@equifax.com<mailto:
> cristian.giha@equifax.com>
> Equifax Chile | Isidora Goyenechea 2800, Las Condes, Santiago.
>
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message