avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Scott Carey <scottca...@apache.org>
Subject Re: Can spill to disk be in compressed Avro format to reduce I/O?
Date Thu, 12 Jan 2012 22:09:09 GMT
The Recodec tool may be useful, and the source code is a good reference.

java ­jar avro-tools-<VERSION>.jar
http://svn.apache.org/viewvc/avro/tags/release-1.6.1/lang/java/tools/src/ma
in/java/org/apache/avro/tool/RecodecTool.java?view=co

https://issues.apache.org/jira/browse/AVRO-684



On 1/12/12 12:53 PM, "Scott Carey" <scottcarey@apache.org> wrote:


>
>
>On 1/12/12 12:35 PM, "Frank Grimes" <frankgrimes97@gmail.com> wrote:
>
>
>>So I decided to try writing my own AvroStreamCombiner utility and it
>>seems to choke when passing multiple input files:
>>
>>
>>hadoop dfs -cat hdfs://hadoop/machine1.log.avro
>>hdfs://hadoop/machine2.log.avro | ./deliveryLogAvroStreamCombiner.sh >
>>combined.log.avro
>>
>>
>>
>>
>>Exception in thread "main" java.io.IOException: Invalid sync!	
>>
>>at 
>>org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:293)
>>at 
>>org.apache.avro.file.DataFileWriter.appendAllFrom(DataFileWriter.java:329
>>)
>>at DeliveryLogAvroStreamCombiner.main(Unknown Source)
>>
>>
>>
>>
>>Here's the code in question:
>>
>>public class DeliveryLogAvroStreamCombiner {
>>	
>>
>>	/**
>>	 * @param args
>>	 */
>>	public static void main(String[] args) throws Exception {
>>		DataFileStream<DeliveryLogEvent> dfs = null;
>>		DataFileWriter<DeliveryLogEvent> dfw = null;
>>		
>>
>>		try {
>>			dfs = new DataFileStream<DeliveryLogEvent>(System.in, new
>>SpecificDatumReader<DeliveryLogEvent>());
>>			
>>
>>			OutputStream stdout = System.out;
>>			
>>
>>			dfw = new DataFileWriter<DeliveryLogEvent>(new
>>SpecificDatumWriter<DeliveryLogEvent>());
>>			dfw.setCodec(CodecFactory.deflateCodec(9));
>>			dfw.setSyncInterval(1024 * 256);
>>			dfw.create(DeliveryLogEvent.SCHEMA$, stdout);
>>
>>			dfw.appendAllFrom(dfs, false);
>>
>>
>>
>
>dfs is from System.in, which has multiple files one after the other.
>Each file will need its own DataFileStream (has its own header and
>metadata).   
>
>In Java you could get the list of files, and for each file use HDFS's API
>to open the file stream, and append that to your one file.
>In bash you could loop over all the source files and append one at a time
>(the above fails on the second file).  However, in order to append to the
>end of a pre-existing file the only API now takes a File, not a seekable
>stream, so Avro would need a patch to allow that in HDFS (also, only an
>HDFS version that supports appends would work).
>
>Other things of note:
>You will probably get better total file size compression by using a
>larger sync interval (1M to 4 M) than deflate level 9.  Deflate 9 is VERY
>slow and almost never compresses more than 1% better than deflate 6,
>which is much faster.  I suggest experimenting with the 'recodec' option
>on some of your files to see what the best size / performance tradeoff
>is.  I doubt that 256K (pre-compression) blocks with level 9 compression
>is the way to go.
>
>For reference: http://tukaani.org/lzma/benchmarks.html
>(gzip uses deflate compression)
>
>-Scott
>
>
>
>>		}
>>		finally {
>>			if (dfs != null) try {dfs.close();} catch (Exception e)
>>{e.printStackTrace();}
>>			if (dfw != null) try {dfw.close();} catch (Exception e)
>>{e.printStackTrace();}
>>		}
>>	}
>>
>>}
>>
>>
>>Is there any way this could be made to work without needing to download
>>the individual files to disk and calling append for each of them?
>>
>>Thanks,
>>
>>Frank Grimes
>>
>>
>>On 2012-01-12, at 2:24 PM, Frank Grimes wrote:
>>
>>
>>Hi Scott,
>>
>>If I have a map-only job, would I want only one mapper running to pull
>>all the records from the source input files and stream/append them to
>>the target avro file?
>>Would that be no different (or more efficient) than doing "hadoop dfs
>>-cat file1 file2 file3" and piping the output to append to a "hadoop dfs
>>-put combinedFile"?
>>In that case, my only question is how would I combine the avro files
>>into a new file without deserializing them?
>>
>>Thanks,
>>
>>Frank Grimes
>>
>>
>>On 2012-01-12, at 1:14 PM, Scott Carey wrote:
>>
>>
>>
>>
>>On 1/12/12 8:27 AM, "Frank Grimes" <frankgrimes97@gmail.com> wrote:
>>
>>
>>>Hi All,
>>>We have Avro data files in HDFS which are compressed using the Deflate
>>>codec.
>>>We have written an M/R job using the Avro Mapred API to combine those
>>>files.
>>>
>>>It seems to be working fine, however when we run it we notice that the
>>>temporary work area (spills, etc) seem to be uncompressed.
>>>We're thinking we might see a speedup due to reduced I/O if the
>>>temporary files are compressed as well.
>>>
>>>
>>
>>If all you want to do is combine the files, there is no reason to
>>deserialize and reserialize the contents, and a map-only job could
>>suffice.
>>If this is the case, you might want to consider one of two optoins:
>>1.  Use a map only job, with a combined file input.  This will produce
>>one file per mapper and no intermediate data.
>>2.  Use the Avro data file API to append to a file.  I am not sure if
>>this will work with HDFS without some modifications to Avro, but it
>>should be possible since the data file APIs can take
>>InputStream/OutputStream.  The data file API has the ability to append
>>data blocks from the file if the schemas are an exact match.  This can
>>be done without deserialization, and optionally can change the
>>compression level or leave it alone.
>>
>>>
>>>Is there a way to enable "mapred.compress.map.output" in such a way
>>>that those temporary files are compressed as Avro/Deflate?
>>>I tried simply setting conf.setBoolean("mapred.compress.map.output",
>>>true); but it didn't seem to have any effect.
>>>
>>>
>>
>>I am not sure, as I haven't tried it myself.  However, the Avro M/R
>>should be able to leverage all of the Hadoop compressed intermediate
>>forms.  LZO/Snappy are fast and in our cluster Snappy is the default.
>>Deflate can be a lot slower but much more compact.
>>
>>>
>>>Note that in order to avoid unnecessary sorting overhead, I made each
>>>key a constant (1L) so that the logs are combined but ordering isn't
>>>necessarily preserved. (we don't care about ordering)
>>>
>>>
>>>
>>
>>In that case, I think you can use a map only job.  There may be some
>>work to get a single mapper to read many files however.
>>
>>>
>>>FYI, here are my mapper and reducer.
>>>
>>>
>>>	public static class AvroReachMapper extends
>>>AvroMapper<DeliveryLogEvent, Pair<Long, DeliveryLogEvent>> {
>>>		public void map(DeliveryLogEvent levent, AvroCollector<Pair<Long,
>>>DeliveryLogEvent>> collector, Reporter reporter)
>>>			throws IOException {
>>>			
>>>
>>>			collector.collect(new Pair<Long, DeliveryLogEvent>(1L, levent));
>>>		}
>>>	}
>>>	
>>>
>>>	public static class Reduce extends AvroReducer<Long, DeliveryLogEvent,
>>>DeliveryLogEvent> {
>>>
>>>		@Override
>>>		public void reduce(Long key, Iterable<DeliveryLogEvent> values,
>>>				AvroCollector<DeliveryLogEvent> collector, Reporter reporter)
>>>				throws IOException {
>>>
>>>			for (DeliveryLogEvent event : values) {
>>>				collector.collect(event);
>>>			}
>>>		}
>>>
>>>	}
>>>
>>>
>>>
>>>
>>>	AvroJob.setInputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>	AvroJob.setMapperClass(conf, Mapper.class);
>>>	AvroJob.setMapOutputSchema(conf, SCHEMA);
>>>		
>>>
>>>	AvroJob.setOutputSchema(conf, DeliveryLogEvent.SCHEMA$);
>>>	AvroJob.setOutputCodec(conf, DataFileConstants.DEFLATE_CODEC);
>>>	AvroOutputFormat.setDeflateLevel(conf, 9);
>>>	AvroOutputFormat.setSyncInterval(conf, 1024 * 256);
>>>
>>>	AvroJob.setReducerClass(conf, Reducer.class);
>>>		
>>>
>>>	JobClient.runJob(conf);
>>>
>>>
>>>Thanks,
>>>
>>>Frank Grimes
>>>
>>>
>>>
>>
>>
>>
>>
>>
>>
>>
>>
>>



Mime
View raw message