avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Grimes <frankgrime...@gmail.com>
Subject Re: Can spill to disk be in compressed Avro format to reduce I/O?
Date Thu, 12 Jan 2012 19:24:41 GMT
Hi Scott,

If I have a map-only job, would I want only one mapper running to pull all the records from
the source input files and stream/append them to the target avro file?
Would that be no different (or more efficient) than doing "hadoop dfs -cat file1 file2 file3"
and piping the output to append to a "hadoop dfs -put combinedFile"?
In that case, my only question is how would I combine the avro files into a new file without
deserializing them?

Thanks,

Frank Grimes


On 2012-01-12, at 1:14 PM, Scott Carey wrote:

> 
> 
> On 1/12/12 8:27 AM, "Frank Grimes" <frankgrimes97@gmail.com> wrote:
> 
>> Hi All,
>> 
>> We have Avro data files in HDFS which are compressed using the Deflate codec.
>> We have written an M/R job using the Avro Mapred API to combine those files.
>> 
>> It seems to be working fine, however when we run it we notice that the temporary
work area (spills, etc) seem to be uncompressed.
>> We're thinking we might see a speedup due to reduced I/O if the temporary files are
compressed as well.
> 
> 
> If all you want to do is combine the files, there is no reason to deserialize and reserialize
the contents, and a map-only job could suffice.
> If this is the case, you might want to consider one of two optoins:
> 1.  Use a map only job, with a combined file input.  This will produce one file per mapper
and no intermediate data.
> 2.  Use the Avro data file API to append to a file.  I am not sure if this will work
with HDFS without some modifications to Avro, but it should be possible since the data file
APIs can take InputStream/OutputStream.  The data file API has the ability to append data
blocks from the file if the schemas are an exact match.  This can be done without deserialization,
and optionally can change the compression level or leave it alone.
> 
>> 
>> Is there a way to enable "mapred.compress.map.output" in such a way that those temporary
files are compressed as Avro/Deflate?
>> I tried simply setting conf.setBoolean("mapred.compress.map.output", true); but it
didn't seem to have any effect.
> 
> 
> I am not sure, as I haven't tried it myself.  However, the Avro M/R should be able to
leverage all of the Hadoop compressed intermediate forms.  LZO/Snappy are fast and in our
cluster Snappy is the default.  Deflate can be a lot slower but much more compact.
> 
>> 
>> Note that in order to avoid unnecessary sorting overhead, I made each key a constant
(1L) so that the logs are combined but ordering isn't necessarily preserved. (we don't care
about ordering)
> 
> 
> In that case, I think you can use a map only job.  There may be some work to get a single
mapper to read many files however.
> 
>> 
>> FYI, here are my mapper and reducer.
>> 
>> 
>> 	public static class AvroReachMapper extends AvroMapper<DeliveryLogEvent, Pair<Long,
DeliveryLogEvent>> {
>> 		public void map(DeliveryLogEvent levent, AvroCollector<Pair<Long, DeliveryLogEvent>>
collector, Reporter reporter)
>> 			throws IOException {
>> 			
>> 			collector.collect(new Pair<Long, DeliveryLogEvent>(1L, levent));
>> 		}
>> 	}
>> 	
>> 	public static class Reduce extends AvroReducer<Long, DeliveryLogEvent, DeliveryLogEvent>
{
>> 
>> 		@Override
>> 		public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>> 				AvroCollector<DeliveryLogEvent> collector, Reporter reporter)
>> 				throws IOException {
>> 
>> 			for (DeliveryLogEvent event : values) {
>> 				collector.collect(event);
>> 			}
>> 		}
>> 
>> 	}
>> 
>> Also, I'm setting the following:
>> 
>> 	AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>> 	AvroJob.setMapperClass(conf, Mapper.class);
>> 	AvroJob.setMapOutputSchema(conf, SCHEMA);
>> 		
>> 	AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>> 	AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>> 	AvroOutputFormat.setDeflateLevel(conf, 9);
>> 	AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>> 
>> 	AvroJob.setReducerClass(conf, Reducer.class);
>> 		
>> 	JobClient.runJob(conf);
>> 
>> 
>> Thanks,
>> 
>> Frank Grimes


Mime
View raw message