crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Micah Whitacre <mkwhita...@gmail.com>
Subject Re: Avro+Parquet
Date Mon, 02 Jun 2014 12:53:52 GMT
I don't believe it is a known issue.  I modified an
AvroParquetPipelineIT[1] to verify the output to a target using a source..

  @Test
  public void toAvroParquetFileTargetFromParquet() throws Exception {
    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
    savedRecord.put("name", "John Doe");
    savedRecord.put("age", 42);
    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
    populateGenericParquetFile(Lists.newArrayList(savedRecord),
Person.SCHEMA$);

    Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
tmpDir.getDefaultConfiguration());
    PCollection<Person> genericCollection = pipeline.read(
        new AvroParquetFileSource<Person>(new
Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
    File outputFile = tmpDir.getFile("output");
    Target parquetFileTarget = new
AvroParquetFileTarget(outputFile.getAbsolutePath());
    pipeline.write(genericCollection, parquetFileTarget);
    pipeline.run();

    Person person = genericCollection.materialize().iterator().next();

    PCollection<Person> persistedCollection = pipeline.read(
        new AvroParquetFileSource<Person>(new
Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
    Person persistedPerson =
persistedCollection.materialize().iterator().next();

    Path parquetFile = new Path(new File(outputFile,
"part-m-00000.parquet").getPath());

    AvroParquetReader<Person> reader = new
AvroParquetReader<Person>(parquetFile);

    try {
      Person readPerson = reader.read();
      assertThat(readPerson, is(person));
      assertThat(readPerson, is(persistedPerson));
    } finally {
      reader.close();
    }
  }

The tests passes without any issues.  There have been an number of fixes
since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
 If it does still exist a junit/integration test would be helpful to debug
this issue.


[1] -
https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120


On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <stoffe@gmail.com> wrote:

> Hi
>
> Im trying to read and write data using the avro+parquet combo that
> ships with crunch 0.8.0-cdh4.3.0.
>
> - The writer job looks like this.
>
> PCollection<String> lines = ...
> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
>   @Override
>   public void process(String input, Emitter<User> emitter) {
>     User user = User.newBuilder().setName(input).build();
>     emitter.emit(user);
>   }
> }, Avros.records(User.class));
>
> AvroParquetFileSourceTarget fout = new
> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
> pipeline.write(p, fout);
>
> - The reader job looks like this.
>
> AvroParquetFileSource<User> file = new
> AvroParquetFileSource<User>(out, Avros.records(User.class));
> PCollection<User> users = pipeline.read(file);
> // this line fails with a ClassCastException
> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
>  @Override
>  public void process(User user, Emitter<String> emitter) {
>     emitter.emit(user.getName().toString());
>   }
> }, Writables.strings());
>
>
> However, the reader fails with a java.lang.ClassCastException? Is this
> a know issue or am I doing something wrong?
>
> Cheers,
> -Kristoffer
>
>
> java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to
> mapred.jobs.User
> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:396)
>

Mime
View raw message