avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Scott Carey <scottca...@apache.org>
Subject Re: Order of the schema in Union
Date Thu, 23 Feb 2012 01:14:53 GMT
This may be related to
https://issues.apache.org/jira/browse/AVRO-1023

If not, open a new ticket.  If so, please comment there.

What you describe below seems to have something to do with how Schema.Parser
works with tracking a namespace across multiple parse runs.

On 2/21/12 2:49 PM, "Serge Blazhievsky" <easyvoip@gmail.com> wrote:

> Hi Scott,
> 
> Thanks for looking to this.
> 
> I created a small schema and did some experiments.
> 
> Here is my findings:
> 
> 1. If both schemas do not have namespaces, MapReduce job works
> 2. If both schemas have namespaces, MapReduce job works
> 3. if the first schema in the Union does not have namespace, but the second
> one has namespace, MapReduce works
> 4. If the first schema in the Union have namespace, but the second one does
> not, MapReduce fails.
> 
> For some reason, it assigns namespace from the first schema to the second
> while running MapReduce.
> 
> 
> This feels like a bug somewhere.
> 
> This is the schema I am setting:
> 
> Union schema:
> [ {
>   "type" : "record",
>   "name" : "FacebookUser",
>   "namespace" : "FacebookUser",
>   "fields" : [ {
>     "name" : "name",
>     "type" : "string"
>   }, {
>     "name" : "num_likes",
>     "type" : "int"
>   }, {
>     "name" : "num_photos",
>     "type" : "int"
>   }, {
>     "name" : "num_groups",
>     "type" : "int"
>   } ]
> }, {
>   "type" : "record",
>   "name" : "FacebookUser2",
>   "fields" : [ {
>     "name" : "name",
>     "type" : "string"
>   }, {
>     "name" : "num_likes",
>     "type" : "int"
>   }, {
>     "name" : "num_photos",
>     "type" : "int"
>   }, {
>     "name" : "num_groups",
>     "type" : "int"
>   } ]
> } ]
> 
> 
> and this is the schema that MapReduce gets:
> 
>  [ {
>   "type" : "record",
>   "name" : "FacebookUser",
>   "namespace" : "FacebookUser",
>   "fields" : [ {
>     "name" : "name",
>     "type" : "string"
>   }, {
>     "name" : "num_likes",
>     "type" : "int"
>   }, {
>     "name" : "num_photos",
>     "type" : "int"
>   }, {
>     "name" : "num_groups",
>     "type" : "int"
>   } ]
> }, {
>   "type" : "record",
>   "name" : "FacebookUser2",
>   "namespace" : "FacebookUser",
>   "fields" : [ {
>     "name" : "name",
>     "type" : "string"
>   }, {
>     "name" : "num_likes",
>     "type" : "int"
>   }, {
>     "name" : "num_photos",
>     "type" : "int"
>   }, {
>     "name" : "num_groups",
>     "type" : "int"
>   } ]
> } ]
>  
> 
> The difference is the second namespace.
> 
> I would be more then happy to fix in the code, if you could point me to where
> to look
> 
> Regards,
> Serge
> 
> 
> 
> On Tue, Feb 21, 2012 at 9:39 AM, Scott Carey <scottcarey@apache.org> wrote:
>> As for why the union does not seem to match:
>> The Union schemas are not the same as the one in the error ‹ the one in the
>> error does not have a namespace.  It finds "AVRO_NCP_ICM"  but the union has
>> only  "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID".
>> The namespace and name must both match.
>> 
>> Is your output schema correct?  It looks like you are setting both your
>> MapOutputSchema and OutputSchema to be a Pair schema.  I suspect you only
>> want the Pair schema as a map output and reducer input, but cannot be sure
>> from the below.
>> 
>> From the below, your reducer must create Pair objects and output them, and
>> maybe that is related to the error below.  It may also be related to the
>> combiner, does it happen without it?
>> 
>> 
>> 
>> On 2/12/12 11:01 PM, "Serge Blazhievsky" <easyvoip@gmail.com> wrote:
>> 
>>> Hi all,
>>> 
>>> I am running into an interesting problem with Union. It seems that order of
>>> the schema in union must be in the same order as input path for different
>>> files. 
>>> 
>>> This does not look like right behavior. The code and exception are below.
>>> 
>>> The moment I change the order in union it works.
>>> 
>>> 
>>> Thanks
>>> Serge
>>> 
>>> 
>>>    public int run(String[] strings) throws Exception {
>>> 
>>>         JobConf job = new JobConf();
>>> 
>>> 
>>>         job.setNumMapTasks(map);
>>>         job.setNumReduceTasks(reduce);
>>> 
>>> 
>>>         // Uncomment to run locally in a single process
>>>         job.set("mapred.job.tracker", "local");
>>> 
>>>         File file = new File(input);
>>>         DatumReader<GenericRecord> reader = new
>>> GenericDatumReader<GenericRecord>();
>>>         DataFileReader<GenericRecord> dataFileReader = new
>>> DataFileReader<GenericRecord>(file, reader);
>>> 
>>>         Schema s = dataFileReader.getSchema();
>>>         
>>>         
>>> 
>>>       
>>>         
>>>         File lfile = new File(linput);
>>>         DatumReader<GenericRecord> lreader = new
>>> GenericDatumReader<GenericRecord>();
>>>         DataFileReader<GenericRecord> ldataFileReader = new
>>> DataFileReader<GenericRecord>(lfile, lreader);
>>> 
>>>         Schema s2 = ldataFileReader.getSchema();
>>>         
>>>       
>>>        
>>>        List<Schema> slist= new ArrayList<Schema>();
>>>        
>>>        slist.add(s2);
>>>        slist.add(s);
>>>        
>>>        
>>>        
>>>        System.out.println(s.toString(true));
>>>        System.out.println(s2.toString(true));
>>>        
>>>        
>>>        
>>>         Schema s_union=Schema.createUnion(slist);
>>>           
>>>         
>>>        
>>>         AvroJob.setInputSchema(job, s_union);
>>> 
>>> 
>>> 
>>>         List<Schema.Field> fields = s.getFields();
>>> 
>>>         List<Schema.Field> outfields = new ArrayList<Schema.Field>();
>>> 
>>> 
>>>         for (Schema.Field f : fields) {
>>> 
>>>             outfields.add(new Schema.Field(f.name <http://f.name> (),
>>> Schema.create(Type.STRING), null, null));
>>>         }
>>> 
>>>         boolean b = false;
>>>         Schema outschema = Schema.createRecord("AVRO_IVR_BY_CALLID",
>>> "AVRO_IVR_BY_CALLID", "merced", b);
>>> 
>>>         outschema.setFields(outfields);
>>> 
>>>         
>>> 
>>>         Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
>>> 
>>> 
>>>         Schema OUT_SCHEMA = new Pair<String, GenericRecord>("",
>>> STRING_SCHEMA, new GenericData.Record(outschema), outschema).getSchema();
>>> 
>>>        
>>>         AvroJob.setMapOutputSchema(job, OUT_SCHEMA);
>>>         AvroJob.setOutputSchema(job, OUT_SCHEMA);
>>> 
>>>         AvroJob.setMapperClass(job, MapImpl.class);
>>>         AvroJob.setCombinerClass(job, ReduceImpl.class);
>>>         AvroJob.setReducerClass(job, ReduceImpl.class);
>>> 
>>>        // FileInputFormat.setInputPaths(job, new Path(input));
>>>         
>>>         
>>>         FileInputFormat.addInputPath(job, new Path(linput));
>>>         FileInputFormat.addInputPath(job, new Path(input));
>>>         
>>>         
>>>         
>>>         
>>>        // MultipleInputs.addInputPath(job, new Path(input),
>>> AvroInputFormat<GenericRecord>.class, MapImpl.class);
>>>         
>>>         FileOutputFormat.setOutputPath(job, new Path(output));
>>>         FileOutputFormat.setCompressOutput(job, true);
>>> 
>>>         int res = 255;
>>>         RunningJob runJob = JobClient.runJob(job);
>>>         if (runJob != null) {
>>>             res = runJob.isSuccessful() ? 0 : 1;
>>>         }
>>>         return res;
>>>     }
>>> 
>>> 
>>> 2/02/12 22:56:52 WARN mapred.LocalJobRunner: job_local_0001
>>> org.apache.avro.AvroTypeException: Found {
>>>   "type" : "record",
>>>   "name" : "AVRO_NCP_ICM",
>>>   "fields" : [ {
>>>     "name" : "DATADATE",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_CALLID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_ELID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_NAME",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_SITE",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_SVIEW_USER_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_UNIT_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ANI",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALL_CTR_UNIT_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALL_FA_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALL_FUNCTIONALAREA",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CTI_CALL_IDENTIFIER",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALLDISPOSITION",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENTPERIPHERALNUMBER",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "PERIPHERALID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_CALL_START_TIME",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_DERIVED_BAN",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_DERIVED_PTN",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_SESSIONLOOKUPTIME",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "IVR_CALLID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "RECOVERYKEY",
>>>     "type" : "string"
>>>   } ]
>>> }, expecting [ {
>>>   "type" : "record",
>>>   "name" : "Pair",
>>>   "namespace" : "org.apache.avro.mapred",
>>>   "fields" : [ {
>>>     "name" : "key",
>>>     "type" : "string",
>>>     "doc" : ""
>>>   }, {
>>>     "name" : "value",
>>>     "type" : {
>>>       "type" : "record",
>>>       "name" : "AVRO_IVR_BY_CALLID",
>>>       "namespace" : "merced",
>>>       "doc" : "AVRO_IVR_BY_CALLID",
>>>       "fields" : [ {
>>>         "name" : "CALLID",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CALLTS",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "LANGUAGECODE",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "MARKETID",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "ANI",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "BAN_PTN",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "BAN",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "AUDITTRAIL",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "RECOVERYCALLKEY",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "TOTALTRANSACTIONS",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "COMPLETEDTRANSACTIONS",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "BILLABLETRANSACTIONS",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CUSTOMERBILLABLETRANSACTIONS",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "TNTTOCSR",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CSRFIRSTRESOLUTION",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CDC",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "REPEATCALL",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "ELAPSEDIVRTIME",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "ELAPSEDCALLTIME",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CALLPATTERNID",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "DIALEDNUMBERSTRING",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CALLTYPE",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "PERIPHERALID",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "BILLABLE_CALL_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "REPEAT_PROGRAM_TYPE",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "PAST_DUE_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "HYBRID_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "NETWORK_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "AUTHENTICATION_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "HOTLINE_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "SUBSCRIBER_START_DATE",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "PREPAID_IND",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "ACC_TYPE",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "SUB_TYPE",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "ASL_FLG",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CALL_ENV",
>>>         "type" : "string"
>>>       }, {
>>>         "name" : "CALL_SEQ",
>>>         "type" : "string"
>>>       } ]
>>>     },
>>>     "doc" : "",
>>>     "order" : "ignore"
>>>   } ]
>>> }, {
>>>   "type" : "record",
>>>   "name" : "AVRO_NCP_ICM",
>>>   "namespace" : "org.apache.avro.mapred",
>>>   "fields" : [ {
>>>     "name" : "DATADATE",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_CALLID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_ELID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_NAME",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_SITE",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_SVIEW_USER_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENT_UNIT_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ANI",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALL_CTR_UNIT_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALL_FA_ID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALL_FUNCTIONALAREA",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CTI_CALL_IDENTIFIER",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "CALLDISPOSITION",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "AGENTPERIPHERALNUMBER",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "PERIPHERALID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_CALL_START_TIME",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_DERIVED_BAN",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_DERIVED_PTN",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "ICM_SESSIONLOOKUPTIME",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "IVR_CALLID",
>>>     "type" : "string"
>>>   }, {
>>>     "name" : "RECOVERYKEY",
>>>     "type" : "string"
>>>   } ]
>>> } ]
>>>     at 
>>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
>>>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>     at 
>>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206)
>>>     at 
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>>>     at 
>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135)
>>>     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>>     at 
>>> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:69)
>>>     at 
>>> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:34)
>>>     at 
>>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java
>>> :208)
>>>     at 
>>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
>>>     at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>>>     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>>>     at 
>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
>>> 12/02/12 22:56:53 INFO mapred.JobClient:  map 0% reduce 0%
>>> 12/02/12 22:56:53 INFO mapred.JobClient: Job complete: job_local_0001
>>> Job failed with exception:java.io.IOException: Job failed!
> 



Mime
View raw message