Return-Path: X-Original-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0BFEC10568 for ; Tue, 4 Mar 2014 15:26:35 +0000 (UTC) Received: (qmail 77110 invoked by uid 500); 4 Mar 2014 15:26:27 -0000 Delivered-To: apmail-hadoop-mapreduce-user-archive@hadoop.apache.org Received: (qmail 77017 invoked by uid 500); 4 Mar 2014 15:26:26 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 77010 invoked by uid 99); 4 Mar 2014 15:26:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 15:26:25 +0000 X-ASF-Spam-Status: No, hits=2.2 required=5.0 tests=HTML_MESSAGE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of John.Pauley@threattrack.com designates 66.129.99.126 as permitted sender) Received: from [66.129.99.126] (HELO smtp.threattrack.com) (66.129.99.126) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 15:26:19 +0000 From: John Pauley To: "user@hadoop.apache.org" Subject: Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException Thread-Topic: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWriter$AppendWriteException Thread-Index: AQHPNJhXQrZqtUc8e0K4jBkAWcvYYJrPh0WAgAD2HgCAAJVWAA== Date: Tue, 4 Mar 2014 15:25:24 +0000 Message-ID: In-Reply-To: Accept-Language: en-US Content-Language: en-US X-MS-Has-Attach: X-MS-TNEF-Correlator: x-ninja-pim: Scanned by Ninja Content-Type: multipart/alternative; boundary="_000_CF3B5A10950Djohnpauleythreattrackcom_" MIME-Version: 1.0 X-Virus-Checked: Checked by ClamAV on apache.org --_000_CF3B5A10950Djohnpauleythreattrackcom_ Content-Type: text/plain; charset="Windows-1252" Content-Transfer-Encoding: quoted-printable Outside hadoop: avro-1.7.6 Inside hadoop: avro-mapred-1.7.6-hadoop2 From: Stanley Shi > Reply-To: "user@hadoop.apache.org" > Date: Monday, March 3, 2014 at 8:30 PM To: "user@hadoop.apache.org" > Subject: Re: [hadoop] AvroMultipleOutputs org.apache.avro.file.DataFileWrit= er$AppendWriteException which avro version are you using when running outside of hadoop? Regards, Stanley Shi, [http://www.gopivotal.com/files/media/logos/pivotal-logo-email-signature.pn= g] On Mon, Mar 3, 2014 at 11:49 PM, John Pauley > wrote: This is cross posted to avro-user list (http://mail-archives.apache.org/mod= _mbox/avro-user/201402.mbox/%3cCF3612F6.94D2%25john.pauley@threattrack.com%= 3e). Hello all, I=92m having an issue using AvroMultipleOutputs in a map/reduce job. The i= ssue occurs when using a schema that has a union of null and a fixed (among= other complex types), default to null, and it is not null. Please find th= e full stack trace below and a sample map/reduce job that generates an Avro= container file and uses that for the m/r input. Note that I can serialize= /deserialize without issue using GenericDatumWriter/GenericDatumReader outs= ide of hadoop=85 Any insight would be helpful. Stack trace: java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteExcepti= on: java.lang.NullPointerException: in com.foo.bar.simple_schema in union n= ull of union in field baz of com.foo.bar.simple_schema at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404) Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: java.l= ang.NullPointerException: in com.foo.bar.simple_schema in union null of uni= on in field baz of com.foo.bar.simple_schema at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:296) at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.= java:77) at org.apache.avro.mapreduce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.= java:39) at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.= java:400) at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.= java:378) at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.m= ap(AvroContainerFileDriver.java:78) at com.tts.ox.mapreduce.example.avro.AvroContainerFileDriver$SampleMapper.m= ap(AvroContainerFileDriver.java:62) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJob= Runner.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecuto= r.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja= va:918) at java.lang.Thread.run(Thread.java:695) Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schema in = union null of union in field baz of com.foo.bar.simple_schema at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java= :145) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java= :58) at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:290) ... 16 more Caused by: java.lang.NullPointerException at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:457) at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:189) at org.apache.avro.reflect.ReflectData.isRecord(ReflectData.java:167) at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:608) at org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:26= 5) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:597) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWrit= er.java:151) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java= :71) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java= :143) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter= .java:114) at org.apache.avro.reflect.ReflectDatumWriter.writeField(ReflectDatumWriter= .java:175) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWrite= r.java:104) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java= :66) at org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java= :143) Sample m/r job: package com.tts.ox.mapreduce.example.avro; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.DatumWriter; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.avro.mapreduce.AvroMultipleOutputs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.File; import java.io.IOException; public class AvroContainerFileDriver extends Configured implements Tool { // // define a schema with a union of null and fixed private static final String SCHEMA =3D "{\n" + " \"namespace\": \"com.foo.bar\",\n" + " \"name\": \"simple_schema\",\n" + " \"type\": \"record\",\n" + " \"fields\": [{\n" + " \"name\": \"foo\",\n" + " \"type\": {\n" + " \"name\": \"bar\",\n" + " \"type\": \"fixed\",\n" + " \"size\": 2\n" + " }\n" + " }, {\n" + " \"name\": \"baz\",\n" + " \"type\": [\"null\", \"bar\"],\n" + " \"default\": null\n" + " }]\n" + "}"; public static class SampleMapper extends Mapper,= NullWritable, NullWritable, NullWritable> { private AvroMultipleOutputs amos; @Override protected void setup(Context context) { amos =3D new AvroMultipleOutputs(context); } @Override protected void cleanup(Context context) throws IOException, Interru= ptedException { amos.close(); } @Override protected void map(AvroKey record, NullWritable igno= re, Context context) throws IOException, InterruptedException { // simply write the record to a container using AvroMultipleOutputs amos.write("avro", new AvroKey(record.datum()), = NullWritable.get()); } } @Override public int run(final String[] args) throws Exception { Schema.Parser parser =3D new Schema.Parser(); Schema schema =3D parser.parse(SCHEMA); // // generate avro container file for input to mapper byte[] dummy =3D {(byte) 0x01, (byte) 0x02}; GenericData.Fixed foo =3D new GenericData.Fixed(schema.getField("fo= o").schema(), dummy); GenericData.Fixed baz =3D new GenericData.Fixed(schema.getField("ba= z").schema().getTypes().get(1), dummy); GenericRecordBuilder builder =3D new GenericRecordBuilder(schema) .set(schema.getField("foo"), foo); GenericRecord record0 =3D builder.build(); // baz is null builder.set(schema.getField("baz"), baz); GenericRecord record1 =3D builder.build(); // baz is not null, bad = news File file =3D new File("/tmp/avrotest/input/test.avro"); DatumWriter datumWriter =3D new GenericDatumWriter(schema); DataFileWriter dataFileWriter =3D new DataFileWriter= (datumWriter); dataFileWriter.create(schema, file); dataFileWriter.append(record0); // // HELP: job succeeds when we do not have record with non-null baz,= comment out to succeed // dataFileWriter.append(record1); dataFileWriter.close(); // // configure and run job Configuration configuration =3D new Configuration(); String[] otherArgs =3D new GenericOptionsParser(configuration, args= ).getRemainingArgs(); Job job =3D Job.getInstance(configuration, "Sample Avro Map Reduce"= ); job.setInputFormatClass(AvroKeyInputFormat.class); AvroJob.setInputKeySchema(job, schema); job.setMapperClass(SampleMapper.class); job.setNumReduceTasks(0); AvroJob.setOutputKeySchema(job, schema); AvroMultipleOutputs.addNamedOutput(job, "avro", AvroKeyOutputFormat= .class, schema); FileInputFormat.addInputPath(job, new Path(("/tmp/avrotest/input"))= ); FileOutputFormat.setOutputPath(job, new Path("/tmp/avrotest/output"= )); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode =3D ToolRunner.run(new AvroContainerFileDriver(), args= ); System.exit(exitCode); } } Thanks, John Pauley Sr. Software Engineer ThreatTrack Security --_000_CF3B5A10950Djohnpauleythreattrackcom_ Content-Type: text/html; charset="Windows-1252" Content-ID: Content-Transfer-Encoding: quoted-printable
Outside hadoop: avro-1.7.6
Inside hadoop:  avro-mapred-1.7.6-hadoop2


From: Stanley Shi <sshi@gopivotal.com>
Reply-To: "user@hadoop.apache.org" <user@hadoop.apache.org>
Date: Monday, March 3, 2014 at 8:30= PM
To: "user@hadoop.apache.org" <user@hadoop.apache.org>
Subject: Re: [hadoop] AvroMultipleO= utputs org.apache.avro.file.DataFileWriter$AppendWriteException

which avro version are you using when running outside of h= adoop? 

Regards,
Stanley Shi,



On Mon, Mar 3, 2014 at 11:49 PM, John Pauley <John.P= auley@threattrack.com> wrote:

Hello all,

I=92m having an issue using AvroMultipleOutputs in a map/reduce j= ob.  The issue occurs when using a schema that has a union of null and= a fixed (among other complex types), default to null, and it is not null.  Please find t= he full stack trace below and a sample map/reduce job that generates an Avr= o container file and uses that for the m/r input.  Note that I can ser= ialize/deserialize without issue using GenericDatumWriter/GenericDatumReade= r outside of hadoop=85  Any insight would be helpful.

Stack trace:
java.lang.Exception: org.apache.avro.file.DataFileWriter$AppendWriteEx= ception: java.lang.NullPointerException: in com.foo.bar.simple_schema in un= ion null of union in field baz of com.foo.bar.simple_schema
at org.apache.hadoop.mapre= d.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: org.apache.avro.file.DataFileWriter$AppendWriteException: j= ava.lang.NullPointerException: in com.foo.bar.simple_schema in union null o= f union in field baz of com.foo.bar.simple_schema
at org.apache.avro.file.Da= taFileWriter.append(DataFileWriter.java:296)
at org.apache.avro.mapredu= ce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:77)
at org.apache.avro.mapredu= ce.AvroKeyRecordWriter.write(AvroKeyRecordWriter.java:39)
at org.apache.avro.mapredu= ce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:400)
at org.apache.avro.mapredu= ce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:378)
at com.tts.ox.mapreduce.ex= ample.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver= .java:78)
at com.tts.ox.mapreduce.ex= ample.avro.AvroContainerFileDriver$SampleMapper.map(AvroContainerFileDriver= .java:62)
at org.apache.hadoop.mapre= duce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapre= d.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapre= d.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapre= d.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
at java.util.concurrent.Ex= ecutors$RunnableAdapter.call(Executors.java:439)
at java.util.concurrent.Fu= tureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.Fu= tureTask.run(FutureTask.java:138)
at java.util.concurrent.Th= readPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.Th= readPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Th= read.java:695)
Caused by: java.lang.NullPointerException: in com.foo.bar.simple_schem= a in union null of union in field baz of com.foo.bar.simple_schema
at org.apache.avro.reflect= .ReflectDatumWriter.write(ReflectDatumWriter.java:145)
at org.apache.avro.generic= .GenericDatumWriter.write(GenericDatumWriter.java:58)
at org.apache.avro.file.Da= taFileWriter.append(DataFileWriter.java:290)
... 16 more
Caused by: java.lang.NullPointerException
at org.apache.avro.reflect= .ReflectData.createSchema(ReflectData.java:457)
at org.apache.avro.specifi= c.SpecificData.getSchema(SpecificData.java:189)
at org.apache.avro.reflect= .ReflectData.isRecord(ReflectData.java:167)
at org.apache.avro.generic= .GenericData.getSchemaName(GenericData.java:608)
at org.apache.avro.specifi= c.SpecificData.getSchemaName(SpecificData.java:265)
at org.apache.avro.generic= .GenericData.resolveUnion(GenericData.java:597)
at org.apache.avro.generic= .GenericDatumWriter.resolveUnion(GenericDatumWriter.java:151)
at org.apache.avro.generic= .GenericDatumWriter.write(GenericDatumWriter.java:71)
at org.apache.avro.reflect= .ReflectDatumWriter.write(ReflectDatumWriter.java:143)
at org.apache.avro.generic= .GenericDatumWriter.writeField(GenericDatumWriter.java:114)
at org.apache.avro.reflect= .ReflectDatumWriter.writeField(ReflectDatumWriter.java:175)
at org.apache.avro.generic= .GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
at org.apache.avro.generic= .GenericDatumWriter.write(GenericDatumWriter.java:66)
at org.apache.avro.reflect= .ReflectDatumWriter.write(ReflectDatumWriter.java:143)

Sample m/r job:
<mr_job>
package com.tts.ox.mapreduce.example.avro;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.File;
import java.io.IOException;

public class AvroContainerFileDriver extends Configured implements Too= l {
    //
    // define a schema with a union of null and fixed
    private static final String SCHEMA =3D "{\n" &= #43;
            "    \"n= amespace\": \"com.foo.bar\",\n" +
            "    \"n= ame\": \"simple_schema\",\n" +
            "    \"t= ype\": \"record\",\n" +
            "    \"f= ields\": [{\n" +
            "       =  \"name\": \"foo\",\n" +
            "       =  \"type\": {\n" +
            "       =      \"name\": \"bar\",\n" +
            "       =      \"type\": \"fixed\",\n" +<= /div>
            "       =      \"size\": 2\n" +
            "       =  }\n" +
            "    }, {\n&q= uot; +
            "       =  \"name\": \"baz\",\n" +
            "       =  \"type\": [\"null\", \"bar\"],\n" = +
            "       =  \"default\": null\n" +
            "    }]\n&quo= t; +
            "}";

    public static class SampleMapper extends Mapper<AvroK= ey<GenericRecord>, NullWritable, NullWritable, NullWritable> {
        private AvroMultipleOutputs amos;

        @Override
        protected void setup(Context context) {
            amos =3D new AvroMultipleOut= puts(context);
        }

        @Override
        protected void cleanup(Context context) th= rows IOException, InterruptedException {
            amos.close();
        }

        @Override
        protected void map(AvroKey<GenericRecor= d> record, NullWritable ignore, Context context)
                throws IOExcep= tion, InterruptedException {
    // simp= ly write the record to a container using AvroMultipleOutputs
            amos.write("avro",= new AvroKey<GenericRecord>(record.datum()), NullWritable.get());
        }
    }

    @Override
    public int run(final String[] args) throws Exception {
        Schema.Parser parser =3D new Schema.Parser= ();
        Schema schema =3D parser.parse(SCHEMA);

        //
        // generate avro container file for input = to mapper
        byte[] dummy =3D {(byte) 0x01, (byte) 0x02= };
        GenericData.Fixed foo =3D new GenericData.= Fixed(schema.getField("foo").schema(), dummy);
        GenericData.Fixed baz =3D new GenericData.= Fixed(schema.getField("baz").schema().getTypes().get(1), dummy);<= /div>

        GenericRecordBuilder builder =3D new Gener= icRecordBuilder(schema)
                .set(schema.ge= tField("foo"), foo);
        GenericRecord record0 =3D builder.build();= // baz is null

        builder.set(schema.getField("baz"= ;), baz);
        GenericRecord record1 =3D builder.build();= // baz is not null, bad news

        File file =3D new File("/tmp/avrotest= /input/test.avro");
        DatumWriter<GenericRecord> datumWrit= er =3D new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFi= leWriter =3D new DataFileWriter<GenericRecord>(datumWriter);
        dataFileWriter.create(schema, file);
        dataFileWriter.append(record0);
        //
        // HELP: job succeeds when we do not have = record with non-null baz, comment out to succeed
        //
        dataFileWriter.append(record1);
        dataFileWriter.close();

        //
        // configure and run job
        Configuration configuration =3D new Config= uration();
        String[] otherArgs =3D new GenericOptionsP= arser(configuration, args).getRemainingArgs();
        Job job =3D Job.getInstance(configuration,= "Sample Avro Map Reduce");

        job.setInputFormatClass(AvroKeyInputFormat= .class);
        AvroJob.setInputKeySchema(job, schema);

        job.setMapperClass(SampleMapper.class);
        job.setNumReduceTasks(0);

        AvroJob.setOutputKeySchema(job, schema);
        AvroMultipleOutputs.addNamedOutput(job, &q= uot;avro", AvroKeyOutputFormat.class, schema);

        FileInputFormat.addInputPath(job, new Path= (("/tmp/avrotest/input")));
        FileOutputFormat.setOutputPath(job, new Pa= th("/tmp/avrotest/output"));

        return (job.waitForCompletion(true) ? 0 : = 1);
    }

    public static void main(String[] args) throws Exception = {
        int exitCode =3D ToolRunner.run(new AvroCo= ntainerFileDriver(), args);
        System.exit(exitCode);
    }
}
</mr_job>

Thanks,
John Pauley
Sr. Software Engineer
ThreatTrack Security


--_000_CF3B5A10950Djohnpauleythreattrackcom_--