crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <josh.wi...@gmail.com>
Subject Re: Avro+Parquet
Date Wed, 04 Jun 2014 14:49:18 GMT
Ah, that's a good one. Glad it's working now.


On Wed, Jun 4, 2014 at 7:38 AM, Kristoffer Sjögren <stoffe@gmail.com> wrote:

> Sorry for the late reply.
>
> It was my fault. A refactoring changed the java package name without
> changing Avro schema namespace in the $SCHEMA field, which caused Avro
> to fallback on generic records. Works find now!
>
> On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sjögren <stoffe@gmail.com>
> wrote:
> > Thanks for the quick answer!  My initial test still fail, but I may
> > have done something wrong here. I will do a more thorough test asap.
> >
> > On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <mkwhitacre@gmail.com>
> wrote:
> >> I don't believe it is a known issue.  I modified an
> AvroParquetPipelineIT[1]
> >> to verify the output to a target using a source..
> >>
> >>   @Test
> >>   public void toAvroParquetFileTargetFromParquet() throws Exception {
> >>     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
> >>     savedRecord.put("name", "John Doe");
> >>     savedRecord.put("age", 42);
> >>     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy",
> "Jane"));
> >>     populateGenericParquetFile(Lists.newArrayList(savedRecord),
> >> Person.SCHEMA$);
> >>
> >>     Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
> >> tmpDir.getDefaultConfiguration());
> >>     PCollection<Person> genericCollection = pipeline.read(
> >>         new AvroParquetFileSource<Person>(new
> >> Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
> >>     File outputFile = tmpDir.getFile("output");
> >>     Target parquetFileTarget = new
> >> AvroParquetFileTarget(outputFile.getAbsolutePath());
> >>     pipeline.write(genericCollection, parquetFileTarget);
> >>     pipeline.run();
> >>
> >>     Person person = genericCollection.materialize().iterator().next();
> >>
> >>     PCollection<Person> persistedCollection = pipeline.read(
> >>         new AvroParquetFileSource<Person>(new
> >> Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
> >>     Person persistedPerson =
> >> persistedCollection.materialize().iterator().next();
> >>
> >>     Path parquetFile = new Path(new File(outputFile,
> >> "part-m-00000.parquet").getPath());
> >>
> >>     AvroParquetReader<Person> reader = new
> >> AvroParquetReader<Person>(parquetFile);
> >>
> >>     try {
> >>       Person readPerson = reader.read();
> >>       assertThat(readPerson, is(person));
> >>       assertThat(readPerson, is(persistedPerson));
> >>     } finally {
> >>       reader.close();
> >>     }
> >>   }
> >>
> >> The tests passes without any issues.  There have been an number of fixes
> >> since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
> >> version available (0.8.2+71-cdh4.6.0) and see if the problem still
> exists.
> >> If it does still exist a junit/integration test would be helpful to
> debug
> >> this issue.
> >>
> >>
> >> [1] -
> >>
> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
> >>
> >>
> >> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <stoffe@gmail.com>
> wrote:
> >>>
> >>> Hi
> >>>
> >>> Im trying to read and write data using the avro+parquet combo that
> >>> ships with crunch 0.8.0-cdh4.3.0.
> >>>
> >>> - The writer job looks like this.
> >>>
> >>> PCollection<String> lines = ...
> >>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>()
{
> >>>   @Override
> >>>   public void process(String input, Emitter<User> emitter) {
> >>>     User user = User.newBuilder().setName(input).build();
> >>>     emitter.emit(user);
> >>>   }
> >>> }, Avros.records(User.class));
> >>>
> >>> AvroParquetFileSourceTarget fout = new
> >>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
> >>> pipeline.write(p, fout);
> >>>
> >>> - The reader job looks like this.
> >>>
> >>> AvroParquetFileSource<User> file = new
> >>> AvroParquetFileSource<User>(out, Avros.records(User.class));
> >>> PCollection<User> users = pipeline.read(file);
> >>> // this line fails with a ClassCastException
> >>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>()
{
> >>>  @Override
> >>>  public void process(User user, Emitter<String> emitter) {
> >>>     emitter.emit(user.getName().toString());
> >>>   }
> >>> }, Writables.strings());
> >>>
> >>>
> >>> However, the reader fails with a java.lang.ClassCastException? Is this
> >>> a know issue or am I doing something wrong?
> >>>
> >>> Cheers,
> >>> -Kristoffer
> >>>
> >>>
> >>> java.lang.ClassCastException:
> >>> org.apache.avro.generic.GenericData$Record cannot be cast to
> >>> mapred.jobs.User
> >>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> >>> at
> >>>
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> >>> at org.apache.crunch.MapFn.process(MapFn.java:34)
> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
> >>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> >>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>> at javax.security.auth.Subject.doAs(Subject.java:396)
> >>
> >>
>

Mime
View raw message