crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kristoffer Sjögren <sto...@gmail.com>
Subject Re: Avro+Parquet
Date Tue, 03 Jun 2014 11:15:47 GMT
Thanks for the quick answer!  My initial test still fail, but I may
have done something wrong here. I will do a more thorough test asap.

On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <mkwhitacre@gmail.com> wrote:
> I don't believe it is a known issue.  I modified an AvroParquetPipelineIT[1]
> to verify the output to a target using a source..
>
>   @Test
>   public void toAvroParquetFileTargetFromParquet() throws Exception {
>     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
>     savedRecord.put("name", "John Doe");
>     savedRecord.put("age", 42);
>     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
>     populateGenericParquetFile(Lists.newArrayList(savedRecord),
> Person.SCHEMA$);
>
>     Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
> tmpDir.getDefaultConfiguration());
>     PCollection<Person> genericCollection = pipeline.read(
>         new AvroParquetFileSource<Person>(new
> Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
>     File outputFile = tmpDir.getFile("output");
>     Target parquetFileTarget = new
> AvroParquetFileTarget(outputFile.getAbsolutePath());
>     pipeline.write(genericCollection, parquetFileTarget);
>     pipeline.run();
>
>     Person person = genericCollection.materialize().iterator().next();
>
>     PCollection<Person> persistedCollection = pipeline.read(
>         new AvroParquetFileSource<Person>(new
> Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
>     Person persistedPerson =
> persistedCollection.materialize().iterator().next();
>
>     Path parquetFile = new Path(new File(outputFile,
> "part-m-00000.parquet").getPath());
>
>     AvroParquetReader<Person> reader = new
> AvroParquetReader<Person>(parquetFile);
>
>     try {
>       Person readPerson = reader.read();
>       assertThat(readPerson, is(person));
>       assertThat(readPerson, is(persistedPerson));
>     } finally {
>       reader.close();
>     }
>   }
>
> The tests passes without any issues.  There have been an number of fixes
> since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
> version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
> If it does still exist a junit/integration test would be helpful to debug
> this issue.
>
>
> [1] -
> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
>
>
> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <stoffe@gmail.com> wrote:
>>
>> Hi
>>
>> Im trying to read and write data using the avro+parquet combo that
>> ships with crunch 0.8.0-cdh4.3.0.
>>
>> - The writer job looks like this.
>>
>> PCollection<String> lines = ...
>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
>>   @Override
>>   public void process(String input, Emitter<User> emitter) {
>>     User user = User.newBuilder().setName(input).build();
>>     emitter.emit(user);
>>   }
>> }, Avros.records(User.class));
>>
>> AvroParquetFileSourceTarget fout = new
>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
>> pipeline.write(p, fout);
>>
>> - The reader job looks like this.
>>
>> AvroParquetFileSource<User> file = new
>> AvroParquetFileSource<User>(out, Avros.records(User.class));
>> PCollection<User> users = pipeline.read(file);
>> // this line fails with a ClassCastException
>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>()
{
>>  @Override
>>  public void process(User user, Emitter<String> emitter) {
>>     emitter.emit(user.getName().toString());
>>   }
>> }, Writables.strings());
>>
>>
>> However, the reader fails with a java.lang.ClassCastException? Is this
>> a know issue or am I doing something wrong?
>>
>> Cheers,
>> -Kristoffer
>>
>>
>> java.lang.ClassCastException:
>> org.apache.avro.generic.GenericData$Record cannot be cast to
>> mapred.jobs.User
>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>> at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>
>

Mime
View raw message