avro-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Serge Blazhievsky <easyv...@gmail.com>
Subject Re: Order of the schema in Union
Date Tue, 21 Feb 2012 22:49:35 GMT
Hi Scott,

Thanks for looking to this.

I created a small schema and did some experiments.

Here is my findings:

1. If both schemas do not have namespaces, MapReduce job works
2. If both schemas have namespaces, MapReduce job works
3. if the first schema in the Union does not have namespace, but the second
one has namespace, MapReduce works
4. If the first schema in the Union have namespace, but the second one does
not, MapReduce fails.

For some reason, it assigns namespace from the first schema to the second
while running MapReduce.


This feels like a bug somewhere.

This is the schema I am setting:

Union schema:
[ {
  "type" : "record",
  "name" : "FacebookUser",
  "namespace" : "FacebookUser",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "num_likes",
    "type" : "int"
  }, {
    "name" : "num_photos",
    "type" : "int"
  }, {
    "name" : "num_groups",
    "type" : "int"
  } ]
}, {
  "type" : "record",
  "name" : "FacebookUser2",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "num_likes",
    "type" : "int"
  }, {
    "name" : "num_photos",
    "type" : "int"
  }, {
    "name" : "num_groups",
    "type" : "int"
  } ]
} ]


and this is the schema that MapReduce gets:

 [ {
  "type" : "record",
  "name" : "FacebookUser",
  "namespace" : "FacebookUser",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "num_likes",
    "type" : "int"
  }, {
    "name" : "num_photos",
    "type" : "int"
  }, {
    "name" : "num_groups",
    "type" : "int"
  } ]
}, {
  "type" : "record",
  "name" : "FacebookUser2",
  "namespace" : "FacebookUser",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "num_likes",
    "type" : "int"
  }, {
    "name" : "num_photos",
    "type" : "int"
  }, {
    "name" : "num_groups",
    "type" : "int"
  } ]
} ]


The difference is the second namespace.

I would be more then happy to fix in the code, if you could point me to
where to look

Regards,
Serge



On Tue, Feb 21, 2012 at 9:39 AM, Scott Carey <scottcarey@apache.org> wrote:

> As for why the union does not seem to match:
> The Union schemas are not the same as the one in the error — the one in
> the error does not have a namespace.  It finds "AVRO_NCP_ICM"  but the
> union has only  "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID".
> The namespace and name must both match.
>
> Is your output schema correct?  It looks like you are setting both your
> MapOutputSchema and OutputSchema to be a Pair schema.  I suspect you only
> want the Pair schema as a map output and reducer input, but cannot be sure
> from the below.
>
> From the below, your reducer must create Pair objects and output them, and
> maybe that is related to the error below.  It may also be related to the
> combiner, does it happen without it?
>
>
>
> On 2/12/12 11:01 PM, "Serge Blazhievsky" <easyvoip@gmail.com> wrote:
>
> Hi all,
>
> I am running into an interesting problem with Union. It seems that order
> of the schema in union must be in the same order as input path for
> different files.
>
> This does not look like right behavior. The code and exception are below.
>
> The moment I change the order in union it works.
>
>
> Thanks
> Serge
>
>
>    public int run(String[] strings) throws Exception {
>
>         JobConf job = new JobConf();
>
>
>         job.setNumMapTasks(map);
>         job.setNumReduceTasks(reduce);
>
>
>         // Uncomment to run locally in a single process
>         job.set("mapred.job.tracker", "local");
>
>         File file = new File(input);
>         DatumReader<GenericRecord> reader = new
> GenericDatumReader<GenericRecord>();
>         DataFileReader<GenericRecord> dataFileReader = new
> DataFileReader<GenericRecord>(file, reader);
>
>         Schema s = dataFileReader.getSchema();
>
>
>
>
>
>         File lfile = new File(linput);
>         DatumReader<GenericRecord> lreader = new
> GenericDatumReader<GenericRecord>();
>         DataFileReader<GenericRecord> ldataFileReader = new
> DataFileReader<GenericRecord>(lfile, lreader);
>
>         Schema s2 = ldataFileReader.getSchema();
>
>
>
>        List<Schema> slist= new ArrayList<Schema>();
>
>        slist.add(s2);
>        slist.add(s);
>
>
>
>        System.out.println(s.toString(true));
>        System.out.println(s2.toString(true));
>
>
>
>         Schema s_union=Schema.createUnion(slist);
>
>
>
>         AvroJob.setInputSchema(job, s_union);
>
>
>
>         List<Schema.Field> fields = s.getFields();
>
>         List<Schema.Field> outfields = new ArrayList<Schema.Field>();
>
>
>         for (Schema.Field f : fields) {
>
>             outfields.add(new Schema.Field(f.name(),
> Schema.create(Type.STRING), null, null));
>         }
>
>         boolean b = false;
>         Schema outschema = Schema.createRecord("AVRO_IVR_BY_CALLID",
> "AVRO_IVR_BY_CALLID", "merced", b);
>
>         outschema.setFields(outfields);
>
>
>
>         Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
>
>
>         Schema OUT_SCHEMA = new Pair<String, GenericRecord>("",
> STRING_SCHEMA, new GenericData.Record(outschema), outschema).getSchema();
>
>
>         AvroJob.setMapOutputSchema(job, OUT_SCHEMA);
>         AvroJob.setOutputSchema(job, OUT_SCHEMA);
>
>         AvroJob.setMapperClass(job, MapImpl.class);
>         AvroJob.setCombinerClass(job, ReduceImpl.class);
>         AvroJob.setReducerClass(job, ReduceImpl.class);
>
>        // FileInputFormat.setInputPaths(job, new Path(input));
>
>
>         FileInputFormat.addInputPath(job, new Path(linput));
>         FileInputFormat.addInputPath(job, new Path(input));
>
>
>
>
>        // MultipleInputs.addInputPath(job, new Path(input),
> AvroInputFormat<GenericRecord>.class, MapImpl.class);
>
>         FileOutputFormat.setOutputPath(job, new Path(output));
>         FileOutputFormat.setCompressOutput(job, true);
>
>         int res = 255;
>         RunningJob runJob = JobClient.runJob(job);
>         if (runJob != null) {
>             res = runJob.isSuccessful() ? 0 : 1;
>         }
>         return res;
>     }
>
>
> 2/02/12 22:56:52 WARN mapred.LocalJobRunner: job_local_0001
> org.apache.avro.AvroTypeException: Found {
>   "type" : "record",
>   "name" : "AVRO_NCP_ICM",
>   "fields" : [ {
>     "name" : "DATADATE",
>     "type" : "string"
>   }, {
>     "name" : "ICM_CALLID",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_ELID",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_NAME",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_SITE",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_SVIEW_USER_ID",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_UNIT_ID",
>     "type" : "string"
>   }, {
>     "name" : "ANI",
>     "type" : "string"
>   }, {
>     "name" : "CALL_CTR_UNIT_ID",
>     "type" : "string"
>   }, {
>     "name" : "CALL_FA_ID",
>     "type" : "string"
>   }, {
>     "name" : "CALL_FUNCTIONALAREA",
>     "type" : "string"
>   }, {
>     "name" : "CTI_CALL_IDENTIFIER",
>     "type" : "string"
>   }, {
>     "name" : "CALLDISPOSITION",
>     "type" : "string"
>   }, {
>     "name" : "AGENTPERIPHERALNUMBER",
>     "type" : "string"
>   }, {
>     "name" : "PERIPHERALID",
>     "type" : "string"
>   }, {
>     "name" : "ICM_CALL_START_TIME",
>     "type" : "string"
>   }, {
>     "name" : "ICM_DERIVED_BAN",
>     "type" : "string"
>   }, {
>     "name" : "ICM_DERIVED_PTN",
>     "type" : "string"
>   }, {
>     "name" : "ICM_SESSIONLOOKUPTIME",
>     "type" : "string"
>   }, {
>     "name" : "IVR_CALLID",
>     "type" : "string"
>   }, {
>     "name" : "RECOVERYKEY",
>     "type" : "string"
>   } ]
> }, expecting [ {
>   "type" : "record",
>   "name" : "Pair",
>   "namespace" : "org.apache.avro.mapred",
>   "fields" : [ {
>     "name" : "key",
>     "type" : "string",
>     "doc" : ""
>   }, {
>     "name" : "value",
>     "type" : {
>       "type" : "record",
>       "name" : "AVRO_IVR_BY_CALLID",
>       "namespace" : "merced",
>       "doc" : "AVRO_IVR_BY_CALLID",
>       "fields" : [ {
>         "name" : "CALLID",
>         "type" : "string"
>       }, {
>         "name" : "CALLTS",
>         "type" : "string"
>       }, {
>         "name" : "LANGUAGECODE",
>         "type" : "string"
>       }, {
>         "name" : "MARKETID",
>         "type" : "string"
>       }, {
>         "name" : "ANI",
>         "type" : "string"
>       }, {
>         "name" : "BAN_PTN",
>         "type" : "string"
>       }, {
>         "name" : "BAN",
>         "type" : "string"
>       }, {
>         "name" : "AUDITTRAIL",
>         "type" : "string"
>       }, {
>         "name" : "RECOVERYCALLKEY",
>         "type" : "string"
>       }, {
>         "name" : "TOTALTRANSACTIONS",
>         "type" : "string"
>       }, {
>         "name" : "COMPLETEDTRANSACTIONS",
>         "type" : "string"
>       }, {
>         "name" : "BILLABLETRANSACTIONS",
>         "type" : "string"
>       }, {
>         "name" : "CUSTOMERBILLABLETRANSACTIONS",
>         "type" : "string"
>       }, {
>         "name" : "TNTTOCSR",
>         "type" : "string"
>       }, {
>         "name" : "CSRFIRSTRESOLUTION",
>         "type" : "string"
>       }, {
>         "name" : "CDC",
>         "type" : "string"
>       }, {
>         "name" : "REPEATCALL",
>         "type" : "string"
>       }, {
>         "name" : "ELAPSEDIVRTIME",
>         "type" : "string"
>       }, {
>         "name" : "ELAPSEDCALLTIME",
>         "type" : "string"
>       }, {
>         "name" : "CALLPATTERNID",
>         "type" : "string"
>       }, {
>         "name" : "DIALEDNUMBERSTRING",
>         "type" : "string"
>       }, {
>         "name" : "CALLTYPE",
>         "type" : "string"
>       }, {
>         "name" : "PERIPHERALID",
>         "type" : "string"
>       }, {
>         "name" : "BILLABLE_CALL_FLG",
>         "type" : "string"
>       }, {
>         "name" : "REPEAT_PROGRAM_TYPE",
>         "type" : "string"
>       }, {
>         "name" : "PAST_DUE_FLG",
>         "type" : "string"
>       }, {
>         "name" : "HYBRID_FLG",
>         "type" : "string"
>       }, {
>         "name" : "NETWORK_FLG",
>         "type" : "string"
>       }, {
>         "name" : "AUTHENTICATION_FLG",
>         "type" : "string"
>       }, {
>         "name" : "HOTLINE_FLG",
>         "type" : "string"
>       }, {
>         "name" : "SUBSCRIBER_START_DATE",
>         "type" : "string"
>       }, {
>         "name" : "PREPAID_IND",
>         "type" : "string"
>       }, {
>         "name" : "ACC_TYPE",
>         "type" : "string"
>       }, {
>         "name" : "SUB_TYPE",
>         "type" : "string"
>       }, {
>         "name" : "ASL_FLG",
>         "type" : "string"
>       }, {
>         "name" : "CALL_ENV",
>         "type" : "string"
>       }, {
>         "name" : "CALL_SEQ",
>         "type" : "string"
>       } ]
>     },
>     "doc" : "",
>     "order" : "ignore"
>   } ]
> }, {
>   "type" : "record",
>   "name" : "AVRO_NCP_ICM",
>   "namespace" : "org.apache.avro.mapred",
>   "fields" : [ {
>     "name" : "DATADATE",
>     "type" : "string"
>   }, {
>     "name" : "ICM_CALLID",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_ELID",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_NAME",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_SITE",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_SVIEW_USER_ID",
>     "type" : "string"
>   }, {
>     "name" : "AGENT_UNIT_ID",
>     "type" : "string"
>   }, {
>     "name" : "ANI",
>     "type" : "string"
>   }, {
>     "name" : "CALL_CTR_UNIT_ID",
>     "type" : "string"
>   }, {
>     "name" : "CALL_FA_ID",
>     "type" : "string"
>   }, {
>     "name" : "CALL_FUNCTIONALAREA",
>     "type" : "string"
>   }, {
>     "name" : "CTI_CALL_IDENTIFIER",
>     "type" : "string"
>   }, {
>     "name" : "CALLDISPOSITION",
>     "type" : "string"
>   }, {
>     "name" : "AGENTPERIPHERALNUMBER",
>     "type" : "string"
>   }, {
>     "name" : "PERIPHERALID",
>     "type" : "string"
>   }, {
>     "name" : "ICM_CALL_START_TIME",
>     "type" : "string"
>   }, {
>     "name" : "ICM_DERIVED_BAN",
>     "type" : "string"
>   }, {
>     "name" : "ICM_DERIVED_PTN",
>     "type" : "string"
>   }, {
>     "name" : "ICM_SESSIONLOOKUPTIME",
>     "type" : "string"
>   }, {
>     "name" : "IVR_CALLID",
>     "type" : "string"
>   }, {
>     "name" : "RECOVERYKEY",
>     "type" : "string"
>   } ]
> } ]
>     at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)
>     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>     at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
>     at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135)
>     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>     at
> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:69)
>     at
> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:34)
>     at
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
>     at
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
>     at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>     at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>     at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)
> 12/02/12 22:56:53 INFO mapred.JobClient:  map 0% reduce 0%
> 12/02/12 22:56:53 INFO mapred.JobClient: Job complete: job_local_0001
> Job failed with exception:java.io.IOException: Job failed!
>
>

Mime
View raw message