Return-Path: X-Original-To: apmail-avro-user-archive@www.apache.org Delivered-To: apmail-avro-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E2AD993B for ; Thu, 23 Feb 2012 01:11:37 +0000 (UTC) Received: (qmail 74843 invoked by uid 500); 23 Feb 2012 01:11:36 -0000 Delivered-To: apmail-avro-user-archive@avro.apache.org Received: (qmail 74798 invoked by uid 500); 23 Feb 2012 01:11:36 -0000 Mailing-List: contact user-help@avro.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@avro.apache.org Delivered-To: mailing list user@avro.apache.org Received: (qmail 74785 invoked by uid 99); 23 Feb 2012 01:11:36 -0000 Received: from minotaur.apache.org (HELO minotaur.apache.org) (140.211.11.9) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2012 01:11:36 +0000 Received: from localhost (HELO [10.0.0.196]) (127.0.0.1) (smtp-auth username scottcarey, mechanism login) by minotaur.apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2012 01:11:36 +0000 User-Agent: Microsoft-MacOutlook/14.14.0.111121 Date: Wed, 22 Feb 2012 17:14:53 -0800 Subject: Re: Order of the schema in Union From: Scott Carey Sender: Scott Carey To: "user@avro.apache.org" CC: Scott Carey Message-ID: Thread-Topic: Order of the schema in Union In-Reply-To: Mime-version: 1.0 Content-type: multipart/alternative; boundary="B_3412775705_3404078" > This message is in MIME format. Since your mail reader does not understand this format, some or all of this message may not be legible. --B_3412775705_3404078 Content-type: text/plain; charset="ISO-8859-1" Content-transfer-encoding: quoted-printable This may be related to https://issues.apache.org/jira/browse/AVRO-1023 If not, open a new ticket. If so, please comment there. What you describe below seems to have something to do with how Schema.Parse= r works with tracking a namespace across multiple parse runs. On 2/21/12 2:49 PM, "Serge Blazhievsky" wrote: > Hi Scott, >=20 > Thanks for looking to this. >=20 > I created a small schema and did some experiments. >=20 > Here is my findings: >=20 > 1. If both schemas do not have namespaces, MapReduce job works > 2. If both schemas have namespaces, MapReduce job works > 3. if the first schema in the Union does not have namespace, but the seco= nd > one has namespace, MapReduce works > 4. If the first schema in the Union have namespace, but the second one do= es > not, MapReduce fails. >=20 > For some reason, it assigns namespace from the first schema to the second > while running MapReduce. >=20 >=20 > This feels like a bug somewhere. >=20 > This is the schema I am setting: >=20 > Union schema: > [ { > "type" : "record", > "name" : "FacebookUser", > "namespace" : "FacebookUser", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > }, { > "type" : "record", > "name" : "FacebookUser2", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > } ] >=20 >=20 > and this is the schema that MapReduce gets: >=20 > [ { > "type" : "record", > "name" : "FacebookUser", > "namespace" : "FacebookUser", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > }, { > "type" : "record", > "name" : "FacebookUser2", > "namespace" : "FacebookUser", > "fields" : [ { > "name" : "name", > "type" : "string" > }, { > "name" : "num_likes", > "type" : "int" > }, { > "name" : "num_photos", > "type" : "int" > }, { > "name" : "num_groups", > "type" : "int" > } ] > } ] > =20 >=20 > The difference is the second namespace. >=20 > I would be more then happy to fix in the code, if you could point me to w= here > to look >=20 > Regards, > Serge >=20 >=20 >=20 > On Tue, Feb 21, 2012 at 9:39 AM, Scott Carey wrot= e: >> As for why the union does not seem to match: >> The Union schemas are not the same as the one in the error =8B the one in = the >> error does not have a namespace. It finds "AVRO_NCP_ICM" but the union= has >> only "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID". >> The namespace and name must both match. >>=20 >> Is your output schema correct? It looks like you are setting both your >> MapOutputSchema and OutputSchema to be a Pair schema. I suspect you onl= y >> want the Pair schema as a map output and reducer input, but cannot be su= re >> from the below. >>=20 >> From the below, your reducer must create Pair objects and output them, a= nd >> maybe that is related to the error below. It may also be related to the >> combiner, does it happen without it? >>=20 >>=20 >>=20 >> On 2/12/12 11:01 PM, "Serge Blazhievsky" wrote: >>=20 >>> Hi all, >>>=20 >>> I am running into an interesting problem with Union. It seems that orde= r of >>> the schema in union must be in the same order as input path for differe= nt >>> files.=20 >>>=20 >>> This does not look like right behavior. The code and exception are belo= w. >>>=20 >>> The moment I change the order in union it works. >>>=20 >>>=20 >>> Thanks >>> Serge >>>=20 >>>=20 >>> public int run(String[] strings) throws Exception { >>>=20 >>> JobConf job =3D new JobConf(); >>>=20 >>>=20 >>> job.setNumMapTasks(map); >>> job.setNumReduceTasks(reduce); >>>=20 >>>=20 >>> // Uncomment to run locally in a single process >>> job.set("mapred.job.tracker", "local"); >>>=20 >>> File file =3D new File(input); >>> DatumReader reader =3D new >>> GenericDatumReader(); >>> DataFileReader dataFileReader =3D new >>> DataFileReader(file, reader); >>>=20 >>> Schema s =3D dataFileReader.getSchema(); >>> =20 >>> =20 >>>=20 >>> =20 >>> =20 >>> File lfile =3D new File(linput); >>> DatumReader lreader =3D new >>> GenericDatumReader(); >>> DataFileReader ldataFileReader =3D new >>> DataFileReader(lfile, lreader); >>>=20 >>> Schema s2 =3D ldataFileReader.getSchema(); >>> =20 >>> =20 >>> =20 >>> List slist=3D new ArrayList(); >>> =20 >>> slist.add(s2); >>> slist.add(s); >>> =20 >>> =20 >>> =20 >>> System.out.println(s.toString(true)); >>> System.out.println(s2.toString(true)); >>> =20 >>> =20 >>> =20 >>> Schema s_union=3DSchema.createUnion(slist); >>> =20 >>> =20 >>> =20 >>> AvroJob.setInputSchema(job, s_union); >>>=20 >>>=20 >>>=20 >>> List fields =3D s.getFields(); >>>=20 >>> List outfields =3D new ArrayList(); >>>=20 >>>=20 >>> for (Schema.Field f : fields) { >>>=20 >>> outfields.add(new Schema.Field(f.name (), >>> Schema.create(Type.STRING), null, null)); >>> } >>>=20 >>> boolean b =3D false; >>> Schema outschema =3D Schema.createRecord("AVRO_IVR_BY_CALLID", >>> "AVRO_IVR_BY_CALLID", "merced", b); >>>=20 >>> outschema.setFields(outfields); >>>=20 >>> =20 >>>=20 >>> Schema STRING_SCHEMA =3D Schema.create(Schema.Type.STRING); >>>=20 >>>=20 >>> Schema OUT_SCHEMA =3D new Pair("", >>> STRING_SCHEMA, new GenericData.Record(outschema), outschema).getSchema(= ); >>>=20 >>> =20 >>> AvroJob.setMapOutputSchema(job, OUT_SCHEMA); >>> AvroJob.setOutputSchema(job, OUT_SCHEMA); >>>=20 >>> AvroJob.setMapperClass(job, MapImpl.class); >>> AvroJob.setCombinerClass(job, ReduceImpl.class); >>> AvroJob.setReducerClass(job, ReduceImpl.class); >>>=20 >>> // FileInputFormat.setInputPaths(job, new Path(input)); >>> =20 >>> =20 >>> FileInputFormat.addInputPath(job, new Path(linput)); >>> FileInputFormat.addInputPath(job, new Path(input)); >>> =20 >>> =20 >>> =20 >>> =20 >>> // MultipleInputs.addInputPath(job, new Path(input), >>> AvroInputFormat.class, MapImpl.class); >>> =20 >>> FileOutputFormat.setOutputPath(job, new Path(output)); >>> FileOutputFormat.setCompressOutput(job, true); >>>=20 >>> int res =3D 255; >>> RunningJob runJob =3D JobClient.runJob(job); >>> if (runJob !=3D null) { >>> res =3D runJob.isSuccessful() ? 0 : 1; >>> } >>> return res; >>> } >>>=20 >>>=20 >>> 2/02/12 22:56:52 WARN mapred.LocalJobRunner: job_local_0001 >>> org.apache.avro.AvroTypeException: Found { >>> "type" : "record", >>> "name" : "AVRO_NCP_ICM", >>> "fields" : [ { >>> "name" : "DATADATE", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_ELID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_NAME", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SITE", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SVIEW_USER_ID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "ANI", >>> "type" : "string" >>> }, { >>> "name" : "CALL_CTR_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FA_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FUNCTIONALAREA", >>> "type" : "string" >>> }, { >>> "name" : "CTI_CALL_IDENTIFIER", >>> "type" : "string" >>> }, { >>> "name" : "CALLDISPOSITION", >>> "type" : "string" >>> }, { >>> "name" : "AGENTPERIPHERALNUMBER", >>> "type" : "string" >>> }, { >>> "name" : "PERIPHERALID", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALL_START_TIME", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_BAN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_PTN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_SESSIONLOOKUPTIME", >>> "type" : "string" >>> }, { >>> "name" : "IVR_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "RECOVERYKEY", >>> "type" : "string" >>> } ] >>> }, expecting [ { >>> "type" : "record", >>> "name" : "Pair", >>> "namespace" : "org.apache.avro.mapred", >>> "fields" : [ { >>> "name" : "key", >>> "type" : "string", >>> "doc" : "" >>> }, { >>> "name" : "value", >>> "type" : { >>> "type" : "record", >>> "name" : "AVRO_IVR_BY_CALLID", >>> "namespace" : "merced", >>> "doc" : "AVRO_IVR_BY_CALLID", >>> "fields" : [ { >>> "name" : "CALLID", >>> "type" : "string" >>> }, { >>> "name" : "CALLTS", >>> "type" : "string" >>> }, { >>> "name" : "LANGUAGECODE", >>> "type" : "string" >>> }, { >>> "name" : "MARKETID", >>> "type" : "string" >>> }, { >>> "name" : "ANI", >>> "type" : "string" >>> }, { >>> "name" : "BAN_PTN", >>> "type" : "string" >>> }, { >>> "name" : "BAN", >>> "type" : "string" >>> }, { >>> "name" : "AUDITTRAIL", >>> "type" : "string" >>> }, { >>> "name" : "RECOVERYCALLKEY", >>> "type" : "string" >>> }, { >>> "name" : "TOTALTRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "COMPLETEDTRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "BILLABLETRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "CUSTOMERBILLABLETRANSACTIONS", >>> "type" : "string" >>> }, { >>> "name" : "TNTTOCSR", >>> "type" : "string" >>> }, { >>> "name" : "CSRFIRSTRESOLUTION", >>> "type" : "string" >>> }, { >>> "name" : "CDC", >>> "type" : "string" >>> }, { >>> "name" : "REPEATCALL", >>> "type" : "string" >>> }, { >>> "name" : "ELAPSEDIVRTIME", >>> "type" : "string" >>> }, { >>> "name" : "ELAPSEDCALLTIME", >>> "type" : "string" >>> }, { >>> "name" : "CALLPATTERNID", >>> "type" : "string" >>> }, { >>> "name" : "DIALEDNUMBERSTRING", >>> "type" : "string" >>> }, { >>> "name" : "CALLTYPE", >>> "type" : "string" >>> }, { >>> "name" : "PERIPHERALID", >>> "type" : "string" >>> }, { >>> "name" : "BILLABLE_CALL_FLG", >>> "type" : "string" >>> }, { >>> "name" : "REPEAT_PROGRAM_TYPE", >>> "type" : "string" >>> }, { >>> "name" : "PAST_DUE_FLG", >>> "type" : "string" >>> }, { >>> "name" : "HYBRID_FLG", >>> "type" : "string" >>> }, { >>> "name" : "NETWORK_FLG", >>> "type" : "string" >>> }, { >>> "name" : "AUTHENTICATION_FLG", >>> "type" : "string" >>> }, { >>> "name" : "HOTLINE_FLG", >>> "type" : "string" >>> }, { >>> "name" : "SUBSCRIBER_START_DATE", >>> "type" : "string" >>> }, { >>> "name" : "PREPAID_IND", >>> "type" : "string" >>> }, { >>> "name" : "ACC_TYPE", >>> "type" : "string" >>> }, { >>> "name" : "SUB_TYPE", >>> "type" : "string" >>> }, { >>> "name" : "ASL_FLG", >>> "type" : "string" >>> }, { >>> "name" : "CALL_ENV", >>> "type" : "string" >>> }, { >>> "name" : "CALL_SEQ", >>> "type" : "string" >>> } ] >>> }, >>> "doc" : "", >>> "order" : "ignore" >>> } ] >>> }, { >>> "type" : "record", >>> "name" : "AVRO_NCP_ICM", >>> "namespace" : "org.apache.avro.mapred", >>> "fields" : [ { >>> "name" : "DATADATE", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_ELID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_NAME", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SITE", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_SVIEW_USER_ID", >>> "type" : "string" >>> }, { >>> "name" : "AGENT_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "ANI", >>> "type" : "string" >>> }, { >>> "name" : "CALL_CTR_UNIT_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FA_ID", >>> "type" : "string" >>> }, { >>> "name" : "CALL_FUNCTIONALAREA", >>> "type" : "string" >>> }, { >>> "name" : "CTI_CALL_IDENTIFIER", >>> "type" : "string" >>> }, { >>> "name" : "CALLDISPOSITION", >>> "type" : "string" >>> }, { >>> "name" : "AGENTPERIPHERALNUMBER", >>> "type" : "string" >>> }, { >>> "name" : "PERIPHERALID", >>> "type" : "string" >>> }, { >>> "name" : "ICM_CALL_START_TIME", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_BAN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_DERIVED_PTN", >>> "type" : "string" >>> }, { >>> "name" : "ICM_SESSIONLOOKUPTIME", >>> "type" : "string" >>> }, { >>> "name" : "IVR_CALLID", >>> "type" : "string" >>> }, { >>> "name" : "RECOVERYKEY", >>> "type" : "string" >>> } ] >>> } ] >>> at=20 >>> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231) >>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>> at=20 >>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206= ) >>> at=20 >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java= :148) >>> at=20 >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java= :135) >>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233= ) >>> at=20 >>> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:69) >>> at=20 >>> org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:34) >>> at=20 >>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask= .java >>> :208) >>> at=20 >>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:= 193) >>> at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) >>> at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325) >>> at=20 >>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210= ) >>> 12/02/12 22:56:53 INFO mapred.JobClient: map 0% reduce 0% >>> 12/02/12 22:56:53 INFO mapred.JobClient: Job complete: job_local_0001 >>> Job failed with exception:java.io.IOException: Job failed! >=20 --B_3412775705_3404078 Content-type: text/html; charset="ISO-8859-1" Content-transfer-encoding: quoted-printable
This may be related to =

If not= , open a new ticket.  If so, please comment there.

=
What you describe below seems to have something to do with how Schema.P= arser works with tracking a namespace across multiple parse runs.
=
On 2/21/12 2:49 PM, "Ser= ge Blazhievsky" <easyvoip@gmail.com> wrote:

Hi Scott,

Thanks for looking to this.

I created a = small schema and did some experiments.

Here is my findings:

1= . If both schemas do not have namespaces, MapReduce job works
2. If both = schemas have namespaces, MapReduce job works
3. if the first schema in the Union does not have namespace, but the second= one has namespace, MapReduce works
4. If the first schema in the Union h= ave namespace, but the second one does not, MapReduce fails.

For som= e reason, it assigns namespace from the first schema to the second while run= ning MapReduce.


This feels like a bug somewhere.

This is= the schema I am setting:

Union schema:
[ {
  "type" : "re= cord",
  "name" : "FacebookUser",
  "namespace" : "FacebookU= ser",
  "fields" : [ {
    "name" : "name",
  = ;  "type" : "string"
  }, {
    "name" : "num= _likes",
    "type" : "int"
  }, {
    "name" : "num_photos",
    "type" : "i= nt"
  }, {
    "name" : "num_groups",
 &nb= sp;  "type" : "int"
  } ]
}, {
  "type" : "record",<= br>   "name" : "FacebookUser2",
  "fields" : [ {
  &n= bsp; "name" : "name",
    "type" : "string"
  }, {=
    "name" : "num_likes",
    "type" : "int"
  }, {
    "nam= e" : "num_photos",
    "type" : "int"
  }, {
&n= bsp;   "name" : "num_groups",
    "type" : "int"=
  } ]
} ]


and this is the schema that MapReduce gets:
 [ {
  "type" : "record",
  "name" : "FacebookUser= ",
  "namespace" : "FacebookUser",
  "fields" : [ {
    "name" : "name",
  = ;  "type" : "string"
  }, {
    "name" : "num= _likes",
    "type" : "int"
  }, {
    "name" : "num_photos",
    "type" : "i= nt"
  }, {
    "name" : "num_groups",
 &nb= sp;  "type" : "int"
  } ]
}, {
  "type" : "record",<= br>   "name" : "FacebookUser2",
  "namespace" : "FacebookUser",  "fields" : [ {
    "name" : "name",
  = ;  "type" : "string"
  }, {
    "name" : "num_likes",
  &nbs= p; "type" : "int"
  }, {
    "name" : "num_photos"= ,
    "type" : "int"
  }, {
    = "name" : "num_groups",
    "type" : "int"
  } ]
} ]
 

The= difference is the second namespace.

I would be more then happy to f= ix in the code, if you could point me to where to look

Regards,
Se= rge



On Tue, Feb 21, 2012 at 9:39 AM,= Scott Carey <scot= tcarey@apache.org> wrote:
As for why the union does not seem to match:
The Union schemas a= re not the same as the one in the error — the one in the error does no= t have a namespace.  It finds "AVRO_NCP_ICM"  but the union has on= ly  "merced.AVRO_NCP_ICM" and "merced. AVRO_IVR_BY_CALLID".
<= div>The namespace and name must both match.

Is your= output schema correct?  It looks like you are setting both your MapOut= putSchema and OutputSchema to be a Pair schema.  I suspect you only wan= t the Pair schema as a map output and reducer input, but cannot be sure from= the below.

From the below, your reducer must creat= e Pair objects and output them, and maybe that is related to the error below= .  It may also be related to the combiner, does it happen without it?



=
On 2/12/12 11:01 PM, "Serge Blazhievsky" <easyvoip@gmail.com> wrote:
=

Hi all,

I am running into an interesting problem wi= th Union. It seems that order of the schema in union must be in the same ord= er as input path for different files.

This does not look like right = behavior. The code and exception are below.

The moment I change the o= rder in union it works.


Thanks
Serge


   p= ublic int run(String[] strings) throws Exception {

   =      JobConf job =3D new JobConf();


  = ;      job.setNumMapTasks(map);
        job.setNumReduceTasks(reduce);

        // Uncomment to run loc= ally in a single process
        job.s= et("mapred.job.tracker", "local");

     &nbs= p;  File file =3D new File(input);
        DatumReader<GenericRecord>= reader =3D new GenericDatumReader<GenericRecord>();
  &nbs= p;     DataFileReader<GenericRecord> dataFileReade= r =3D new DataFileReader<GenericRecord>(file, reader);

        Schema s =3D dataFileReader.getSch= ema();
       
   &= nbsp;   

     
 &nb= sp;     
      &n= bsp; File lfile =3D new File(linput);
      &= nbsp; DatumReader<GenericRecord> lreader =3D new GenericDatumReader<G= enericRecord>();
        DataFileReader<GenericRecord&= gt; ldataFileReader =3D new DataFileReader<GenericRecord>(lfile, lreader= );

        Schema s2 =3D ldataFileRe= ader.getSchema();
       
 &n= bsp;   
      
 &n= bsp;     List<Schema> slist=3D new ArrayList<Sche= ma>();
      
     &nbs= p; slist.add(s2);
       slist.add(s);
&= nbsp;     
      =
      
     =   System.out.println(s.toString(true));
    &nbs= p;  System.out.println(s2.toString(true));
    &= nbsp; 
      
   &= nbsp;  
        Schema s_union=3DSchema.createUnio= n(slist);
         
&nb= sp;      
     &n= bsp;
        AvroJob.setInputSchema(j= ob, s_union);



        List= <Schema.Field> fields =3D s.getFields();

    =     List<Schema.Field> outfields =3D new ArrayList<Sch= ema.Field>();


        for (= Schema.Field f : fields) {

       =      outfields.add(new Schema.Field(f.name(), Schema.create(Type.STRING), null, null));<= br>        }

   &nb= sp;    boolean b =3D false;
        Schema outschema =3D Schema.create= Record("AVRO_IVR_BY_CALLID", "AVRO_IVR_BY_CALLID", "merced", b);

&nbs= p;       outschema.setFields(outfields);
       

    = ;    Schema STRING_SCHEMA =3D Schema.create(Schema.Type.STRING)= ;


        Schema OUT_SCHEMA =3D = new Pair<String, GenericRecord>("", STRING_SCHEMA, new GenericData.Rec= ord(outschema), outschema).getSchema();

     = ; 
        AvroJob.setMapOutputS= chema(job, OUT_SCHEMA);
        AvroJob.setOutputSchema(job, OUT= _SCHEMA);

        AvroJob.setMappe= rClass(job, MapImpl.class);
        Av= roJob.setCombinerClass(job, ReduceImpl.class);
    &n= bsp;   AvroJob.setReducerClass(job, ReduceImpl.class);

&nbs= p;      // FileInputFormat.setInputPaths(job, new P= ath(input));
       
  &= nbsp;    
       = FileInputFormat.addInputPath(job, new Path(linput));
   &= nbsp;    FileInputFormat.addInputPath(job, new Path(input));<= br>        
    &nbs= p;  
       
 &nbs= p;     
       //= MultipleInputs.addInputPath(job, new Path(input), AvroInputFormat<Generi= cRecord>.class, MapImpl.class);
      &n= bsp;
        FileOutputFormat.setOutp= utPath(job, new Path(output));
        FileOutputFormat.setCompressOutp= ut(job, true);

        int res =3D 2= 55;
        RunningJob runJob =3D JobCli= ent.runJob(job);
        if (runJob !=3D= null) {
          &nbs= p; res =3D runJob.isSuccessful() ? 0 : 1;
        }
    &nb= sp;   return res;
    }


2/02/12 22:56= :52 WARN mapred.LocalJobRunner: job_local_0001
org.apache.avro.AvroTypeEx= ception: Found {
  "type" : "record",
  "name" : "AVRO_NCP_I= CM",
  "fields" : [ {
    "name" : "DATADATE",
 &= nbsp;  "type" : "string"
  }, {
    "name" : = "ICM_CALLID",
    "type" : "string"
  }, {
    "name" : "AGENT_ELID",
  &nb= sp; "type" : "string"
  }, {
    "name" : "AGENT_N= AME",
    "type" : "string"
  }, {
    "name" : "AGENT_SITE",
    "type" : "string"
  }, {
    "= name" : "AGENT_SVIEW_USER_ID",
    "type" : "string"
&n= bsp; }, {
    "name" : "AGENT_UNIT_ID",
    "type" : "string"
  }, {
    "= name" : "ANI",
    "type" : "string"
  }, {
&nb= sp;   "name" : "CALL_CTR_UNIT_ID",
    "type" : = "string"
  }, {
    "name" : "CALL_FA_ID",
  &nb= sp; "type" : "string"
  }, {
    "name" : "CALL_FU= NCTIONALAREA",
    "type" : "string"
  }, {
    "name" : "CTI_CALL_IDENTIFIER",
    "t= ype" : "string"
  }, {
    "name" : "CALLDISPOSITI= ON",
    "type" : "string"
  }, {
    "name" : "AGENTPERIPHERALNUMBER",
    = "type" : "string"
  }, {
    "name" : "PERIPHERALI= D",
    "type" : "string"
  }, {
    "name" : "ICM_CALL_START_TIME",
    "t= ype" : "string"
  }, {
    "name" : "ICM_DERIVED_B= AN",
    "type" : "string"
  }, {
    "name" : "ICM_DERIVED_PTN",
    "type"= : "string"
  }, {
    "name" : "ICM_SESSIONLOOKUP= TIME",
    "type" : "string"
  }, {
    "name" : "IVR_CALLID",
    "type" : "s= tring"
  }, {
    "name" : "RECOVERYKEY",
 = ;   "type" : "string"
  } ]
}, expecting [ {
  "type" : "record",
  "name" : "Pair",
  "namespace" = : "org.apache.avro.mapred",
  "fields" : [ {
    "= name" : "key",
    "type" : "string",
    "doc" : ""
&= nbsp; }, {
    "name" : "value",
    "ty= pe" : {
      "type" : "record",
  =     "name" : "AVRO_IVR_BY_CALLID",
      "namespace" : "merced",
   = ;   "doc" : "AVRO_IVR_BY_CALLID",
     = ; "fields" : [ {
        "name" : "CAL= LID",
        "type" : "string"
      }, {
      =   "name" : "CALLTS",
        "typ= e" : "string"
      }, {
   &n= bsp;    "name" : "LANGUAGECODE",
    &= nbsp;   "type" : "string"
      }, {
      =   "name" : "MARKETID",
        "t= ype" : "string"
      }, {
   =      "name" : "ANI",
     &n= bsp;  "type" : "string"
      }, {
      =   "name" : "BAN_PTN",
        "ty= pe" : "string"
      }, {
   &= nbsp;    "name" : "BAN",
     &nb= sp;  "type" : "string"
      }, {
      =   "name" : "AUDITTRAIL",
        = "type" : "string"
      }, {
  &nbs= p;     "name" : "RECOVERYCALLKEY",
   =      "type" : "string"
      }, {
      =   "name" : "TOTALTRANSACTIONS",
      =   "type" : "string"
      }, {
 &nb= sp;      "name" : "COMPLETEDTRANSACTIONS",
 = ;       "type" : "string"
      }, {
      =   "name" : "BILLABLETRANSACTIONS",
     &nb= sp;  "type" : "string"
      }, {
 =        "name" : "CUSTOMERBILLABLETRANSACTIONS"= ,
        "type" : "string"
      }, {
      =   "name" : "TNTTOCSR",
        "t= ype" : "string"
      }, {
   =      "name" : "CSRFIRSTRESOLUTION",
   = ;     "type" : "string"
      }, {
      =   "name" : "CDC",
        "type" = : "string"
      }, {
    = ;    "name" : "REPEATCALL",
     =    "type" : "string"
      }, {
      =   "name" : "ELAPSEDIVRTIME",
      &nb= sp; "type" : "string"
      }, {
  =       "name" : "ELAPSEDCALLTIME",
  &n= bsp;     "type" : "string"
      }, {
      =   "name" : "CALLPATTERNID",
      &nbs= p; "type" : "string"
      }, {
  &= nbsp;     "name" : "DIALEDNUMBERSTRING",
  =       "type" : "string"
      }, {
      =   "name" : "CALLTYPE",
        "t= ype" : "string"
      }, {
   =      "name" : "PERIPHERALID",
    = ;    "type" : "string"
      }, {
      =   "name" : "BILLABLE_CALL_FLG",
      =   "type" : "string"
      }, {
 &nb= sp;      "name" : "REPEAT_PROGRAM_TYPE",
 &= nbsp;      "type" : "string"
      }, {
      =   "name" : "PAST_DUE_FLG",
       = ; "type" : "string"
      }, {
  &n= bsp;     "name" : "HYBRID_FLG",
   &nb= sp;    "type" : "string"
      }, {
      =   "name" : "NETWORK_FLG",
       = "type" : "string"
      }, {
  &nb= sp;     "name" : "AUTHENTICATION_FLG",
  &n= bsp;     "type" : "string"
      }, {
      =   "name" : "HOTLINE_FLG",
       = "type" : "string"
      }, {
  &nb= sp;     "name" : "SUBSCRIBER_START_DATE",
  = ;      "type" : "string"
      }, {
      =   "name" : "PREPAID_IND",
       = "type" : "string"
      }, {
  &nb= sp;     "name" : "ACC_TYPE",
    =     "type" : "string"
      }, {
      =   "name" : "SUB_TYPE",
        "t= ype" : "string"
      }, {
   =      "name" : "ASL_FLG",
    &nbs= p;   "type" : "string"
      }, {
      =   "name" : "CALL_ENV",
        "t= ype" : "string"
      }, {
   =      "name" : "CALL_SEQ",
    &nb= sp;   "type" : "string"
      } ]
    },
  =   "doc" : "",
    "order" : "ignore"
  } ]}, {
  "type" : "record",
  "name" : "AVRO_NCP_ICM",
&nb= sp; "namespace" : "org.apache.avro.mapred",
  "fields" : [ {
    "name" : "DATADATE",
 &= nbsp;  "type" : "string"
  }, {
    "name" : = "ICM_CALLID",
    "type" : "string"
  }, {
    "name" : "AGENT_ELID",
  &nb= sp; "type" : "string"
  }, {
    "name" : "AGENT_N= AME",
    "type" : "string"
  }, {
    "name" : "AGENT_SITE",
    "type" : "string"
  }, {
    "= name" : "AGENT_SVIEW_USER_ID",
    "type" : "string"
&n= bsp; }, {
    "name" : "AGENT_UNIT_ID",
    "type" : "string"
  }, {
    "= name" : "ANI",
    "type" : "string"
  }, {
&nb= sp;   "name" : "CALL_CTR_UNIT_ID",
    "type" : = "string"
  }, {
    "name" : "CALL_FA_ID",
  &nb= sp; "type" : "string"
  }, {
    "name" : "CALL_FU= NCTIONALAREA",
    "type" : "string"
  }, {
    "name" : "CTI_CALL_IDENTIFIER",
    "t= ype" : "string"
  }, {
    "name" : "CALLDISPOSITI= ON",
    "type" : "string"
  }, {
    "name" : "AGENTPERIPHERALNUMBER",
    = "type" : "string"
  }, {
    "name" : "PERIPHERALI= D",
    "type" : "string"
  }, {
    "name" : "ICM_CALL_START_TIME",
    "t= ype" : "string"
  }, {
    "name" : "ICM_DERIVED_B= AN",
    "type" : "string"
  }, {
    "name" : "ICM_DERIVED_PTN",
    "type"= : "string"
  }, {
    "name" : "ICM_SESSIONLOOKUP= TIME",
    "type" : "string"
  }, {
    "name" : "IVR_CALLID",
    "type" : "s= tring"
  }, {
    "name" : "RECOVERYKEY",
 = ;   "type" : "string"
  } ]
} ]
    a= t org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:231)     at org.apache.avro.io.parsing.Parser.advance(Parser.java= :88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(= ResolvingDecoder.java:206)
    at org.apache.avro.generic.= GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(Gener= icDatumReader.java:135)
    at org.apache.avro.file.DataFi= leStream.next(DataFileStream.java:233)
    at org.apache.a= vro.mapred.AvroRecordReader.next(AvroRecordReader.java:69)
    at org.apache.avro.mapred.AvroRecordReader.next(AvroReco= rdReader.java:34)
    at org.apache.hadoop.mapred.MapTask$= TrackedRecordReader.moveToNext(MapTask.java:208)
    at or= g.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)     at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java= :48)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(= MapTask.java:391)
    at org.apache.hadoop.mapred.MapTask.= run(MapTask.java:325)
    at org.apache.hadoop.mapred.Loca= lJobRunner$Job.run(LocalJobRunner.java:210)
12/02/12 22:56:53 INFO mapred.JobClient:  map 0% reduce 0%
12/02/12= 22:56:53 INFO mapred.JobClient: Job complete: job_local_0001
Job failed = with exception:java.io.IOException: Job failed!

--B_3412775705_3404078--