avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Tabora <ratab...@gmail.com>
Subject Re: MapReduce: Using Avro Input/Output Formats without Specifying a schema
Date Thu, 01 May 2014 21:41:07 GMT
Hey all,

Looks like some transitive dependencies were causing me to pick up the 1.7.1 APIs instead
of the 1.7.6.

It appears that this bug was fixed somewhere along the way, you no longer need to set the
AvroJob output key schema in AvroMultipleOutputs. If you actually take a look inside of AvorMultipleOutputs,
the AvroKeyOutputFormat (or ValueOF) is set in the setSchema method.

Regards,
Ryan Tabora
http://ryantabora.com

On May 1, 2014 at 11:13:08 AM, Ryan Tabora (ratabora@gmail.com) wrote:

Hey Fengyun and all,

It seems like the AvroMultipleOutputs API is broken when trying to use a large number of schemas
(I am using ~55 schemas total size ~4MB). Why do I have to specify the output schemas using
the AvroJob API if they are already added using the AvroMultipleOutputs API?

import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.mapreduce.AvroMultipleOutputs;

AvroMultipleOutputs API
for (FileStatus schemaStatus : schemasStatus) {
String schemaContent = readFile(fs, schemaStatus.getPath());
Schema schema = parser.parse(schemaContent);
AvroMultipleOutputs.addNamedOutput(job,
schema.getName().replace("_", ""),
AvroKeyOutputFormat.class, schema);
schemas.add(schema);
}

AvroJob API
AvroJob.setOutputKeySchema(job, Schema.createUnion(schemas));

The problem I am running into is a OOM exception in the Reducer when I’m trying to write
to the AvroMultipleOutputs. It seems like the AvroMultipleOutputs class is putting my giant
schema union into memory via a HashMap. 

2014-05-01 10:27:45,337 FATAL org.apache.hadoop.mapred.Child: Error running child : java.lang.OutOfMemoryError:
GC overhead limit exceeded
      at java.util.HashMap.createEntry(HashMap.java:901)
        at java.util.HashMap.addEntry(HashMap.java:888)
        at java.util.HashMap.put(HashMap.java:509)
        at org.apache.avro.Schema$UnionSchema.<init>(Schema.java:824)
        at org.apache.avro.Schema.parse(Schema.java:1221)
        at org.apache.avro.Schema.parse(Schema.java:1148)
        at org.apache.avro.Schema.parse(Schema.java:1148)
        at org.apache.avro.Schema.parse(Schema.java:1220)
        at org.apache.avro.Schema$Parser.parse(Schema.java:981)
        at org.apache.avro.Schema$Parser.parse(Schema.java:971)
        at org.apache.avro.Schema.parse(Schema.java:1020)
        at org.apache.avro.mapreduce.AvroJob.getOutputKeySchema(AvroJob.java:184)
        at org.apache.avro.mapreduce.AvroKeyOutputFormat.getRecordWriter(AvroKeyOutputFormat.java:85)
        at org.apache.avro.mapreduce.AvroMultipleOutputs.getRecordWriter(AvroMultipleOutputs.java:445)
        at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:407)
        at org.apache.avro.mapreduce.AvroMultipleOutputs.write(AvroMultipleOutputs.java:367)
        at com.ryantabora.mapreduce.SerializerReducer.reduce(SerializerReducer.java:52)
        at com.ryantabora.mapreduce.SerializerReducer.reduce(SerializerReducer.java:20)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:177)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

Why is AvroMultipleOutputs even looking at the AvroJob output key schema definition? Can’t
it just pull the schema from my named outputs?

Regards,
Ryan Tabora
http://ryantabora.com

On April 30, 2014 at 9:49:19 PM, Ryan Tabora (ratabora@gmail.com) wrote:

Wow not sure how I missed this, thank you! :)

Regards,
Ryan Tabora
http://ryantabora.com


On Wed, Apr 30, 2014 at 9:41 PM, Fengyun RAO <raofengyun@gmail.com> wrote:
We also used AvroMultipleOutputs to deal with multiple schemas.

the problem stands the same, you have to set a single mapper output
type (or schema) before submitting the MR job. since there are
multiple schemas, we used Schema.createUnion(List<Schema> types) as
the mapper output schema.

you could write a method to generate the list of schemas from the
input data, before submitting the MR job.

2014-04-30 21:46 GMT+08:00, Ryan Tabora <ratabora@gmail.com>:
> Thanks Rao, I understand how I could do it if I had a single schema across
> all input data. However, my question is if my input data will vary and one
> input could have a different schema from another.
>
> My idea would be to use something like MultipleOutputs or partitioning to
> split up the output data by unique schema.
>
> I guess the question still stands, does anyone have any recommendations for
> dynamically generating the schema using Avro output formats?
>
> Thanks,
> Ryan Tabora
> http://ryantabora.com
>
> On April 29, 2014 at 11:41:51 PM, Fengyun RAO (raofengyun@gmail.com) wrote:
>
> take MapReduce for example, which requires Runner, Mapper, Reducer
>
> the Mapper requires outputting a single Type (or a single Avro schema).
>
> If you have a set of CSV files with different schemas, what output type
> would you expect?
>
> If all the CSV files share the same schema, you could dynamically create the
> schema in the Runner before submitting a MR job.
> If you look into the Schema.java, you would find create(), createRecord(),
> etc. APIs.
> you could simply read one CSV file head, and create the schema using these
> APIs.
> e.g.
>     AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
> creates a schema with only a String field.
>
>
>
> 2014-04-30 4:56 GMT+08:00 Ryan Tabora <ratabora@gmail.com>:
> Hi all,
>
> Whether you’re using Hive or MapReduce, avro input/output formats require
> you to specify a schema at the beginning of the job or the table definition
> in order to work with them. Is there any way to configure the jobs in a way
> that the input/output formats can dynamically determine the schema from the
> data itself?
>
> Think about a job like this. I have a set of CSV files that I want to
> serialize into avro files. These CSV files are self describing and each CSV
> file has a unique schema. If I want to write a job that scans over all of
> this data and serialize it into avro I can’t do that with today’s tools (as
> far as I know). If I can’t specify the schema up front, what can I do? Am I
> forced to write my own avro input/output formats?
>
> The avro schema is stored within the avro data file itself, why can’t these
> input/output formats be smart enough to figure that out? Am I fundamentally
> doing something against the principles of the avro format? I would be
> surprised if no one has run into this issue before.
>
> Regards,
> Ryan Tabora
>
>


--
----------------------------------------------------------------
RAO Fengyun
Center for Astrophysics, Tsinghua University
Tel: +86 13810626496
Email: raofengyun@gmail.com
          rfy02@mails.tsinghua.edu.cn
-----------------------------------------------------------------


Mime
View raw message