avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Scott Carey <scottca...@apache.org>
Subject Re: Hadoop and org.apache.avro.file.DataFileReader sez "Not an Avro data file"
Date Thu, 21 Jul 2011 00:38:34 GMT
An avro data file is not created with a FileOutputStream.  That will write
=
avro binary data to a file, but not in the avro file format (which is
split=
table and contains header metadata).

The API for Avro Data Files is here:
http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package-
<http://avro.apache.org/docs/current/api/java/org/apache/avro/file/package-
s=>summary.html




On 7/20/11 2:35 PM, "Peter Wolf" <opus111@gmail.com> wrote:


>
>  
>
>    
>  
>  
>    Hello, anyone out there know about AVRO file formats and/or Hadoop
>    support?
>    
>    My Hadoop AvroJob code does not recognize the AVRO files created by
>    my other code.  It seems that the MAGIC number is wrong.
>    
>    What is going on?  How many different ways of encoding AVRO files
>    are there, and how do I make sure they match.
>    
>    I am creating the input files like this...
>    
>        static public void write(String file, GenericRecord record,
>    Schema schema) throws IOException {
>            OutputStream o = new FileOutputStream(file);
>            GenericDatumWriter w = new GenericDatumWriter(schema);
>            Encoder e = EncoderFactory.get().binaryEncoder(o, null);
>            w.write(record, e);
>            e.flush();
>        }
>    
>    Hadoop is reading them using org.apache.avro.file.DataFileReader
>    
>    Here is where it breaks.  I checked, and it really is trying to read
>    the right file...
>    
>  /** Open a reader for a file. */
>        public static <D> FileReader<D>
>      openReader(SeekableInput in,
>                                                   DatumReader<D>
>      reader)
>          throws IOException {
>          if (in.length() < MAGIC.length)
>            throw new IOException("Not an Avro data file");
>      
>          // read magic header
>          byte[] magic = new byte[MAGIC.length];
>          in.seek(0);
>          for (int c = 0; c < magic.length; c = in.read(magic, c,
>      magic.length-c)) {}
>          in.seek(0);
>      
>          if (Arrays.equals(MAGIC, magic))              // current
>      format
>            return new DataFileReader<D>(in, reader);
>          if (Arrays.equals(DataFileReader12.MAGIC, magic)) // 1.2
>      format
>            return new DataFileReader12<D>(in, reader);
>          
>      >>>    throw new IOException("Not an Avro data file");
>      <<<
>        }
>    
>
>    
>    Some background...
>    
>    I am trying to write my first AVRO Hadoop application.  I am using
>    Hadoop Cloudera 20.2-737 and AVRO 1.5.1
>    
>    I followed the instructions here...
>    
>       
>http://avro.apache.org/docs/current/api/java/org/apache/avro/mapred/packag
>e-summary.html#package_description
>    
>    
>    The sample code here...
>    
>       
>http://svn.apache.org/viewvc/avro/tags/release-1.5.1/lang/java/mapred/src/
>test/java/org/apache/avro/mapred/TestWordCount.java?view=markup
>    
>    Here is my code which breaks with a "Not an Avro data file" error.
>    
>    
>        public static class MapImpl extends AvroMapper<Account,
>    Pair<Utf8, Long>> {
>            @Override
>            public void map(Account account,
>    AvroCollector<Pair<Utf8, Long>> collector,
>                            Reporter reporter) throws IOException {
>                StringTokenizer tokens = new
>    StringTokenizer(account.timestamp.toString());
>                while (tokens.hasMoreTokens())
>                    collector.collect(new Pair<Utf8, Long>(new
>    Utf8(tokens.nextToken()), 1L));
>            }
>        }
>    
>        public static class ReduceImpl
>                extends AvroReducer<Utf8, Long, Pair<Utf8,
>    Long>> {
>            @Override
>            public void reduce(Utf8 word, Iterable<Long> counts,
>                               AvroCollector<Pair<Utf8,
>    Long>> collector,
>                               Reporter reporter) throws IOException {
>                long sum = 0;
>                for (long count : counts)
>                    sum += count;
>                collector.collect(new Pair<Utf8, Long>(word,
>    sum));
>            }
>        }
>    
>        public int run(String[] args) throws Exception {
>    
>            if (args.length != 2) {
>                System.err.println("Usage: " + getClass().getName() + "
>    <input> <output>");
>                System.exit(2);
>            }
>    
>            JobConf job = new JobConf(this.getClass());
>            Path outputPath = new Path(args[1]);
>    
>            outputPath.getFileSystem(job).delete(outputPath);
>            //WordCountUtil.writeLinesFile();
>    
>            job.setJobName(this.getClass().getName());
>    
>            AvroJob.setInputSchema(job, Account.schema);
>    //Schema.create(Schema.Type.STRING));
>            AvroJob.setOutputSchema(job,
>                    new Pair<Utf8, Long>(new Utf8(""),
>    0L).getSchema());
>    
>            AvroJob.setMapperClass(job, MapImpl.class);
>            AvroJob.setCombinerClass(job, ReduceImpl.class);
>            AvroJob.setReducerClass(job, ReduceImpl.class);
>    
>            FileInputFormat.setInputPaths(job, new Path(args[0]));
>            FileOutputFormat.setOutputPath(job, outputPath);
>            //FileOutputFormat.setCompressOutput(job, true);
>    
>            //WordCountUtil.setMeta(job);
>    
>            JobClient.runJob(job);
>    
>            //WordCountUtil.validateCountsFile();
>    
>            return 0;
>        }
>    
>    
>    
>  



Mime
View raw message