crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kristoffer Sjögren <sto...@gmail.com>
Subject Re: Avro+Parquet
Date Wed, 04 Jun 2014 14:38:01 GMT
Sorry for the late reply.

It was my fault. A refactoring changed the java package name without
changing Avro schema namespace in the $SCHEMA field, which caused Avro
to fallback on generic records. Works find now!

On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sjögren <stoffe@gmail.com> wrote:
> Thanks for the quick answer!  My initial test still fail, but I may
> have done something wrong here. I will do a more thorough test asap.
>
> On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <mkwhitacre@gmail.com> wrote:
>> I don't believe it is a known issue.  I modified an AvroParquetPipelineIT[1]
>> to verify the output to a target using a source..
>>
>>   @Test
>>   public void toAvroParquetFileTargetFromParquet() throws Exception {
>>     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
>>     savedRecord.put("name", "John Doe");
>>     savedRecord.put("age", 42);
>>     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
>>     populateGenericParquetFile(Lists.newArrayList(savedRecord),
>> Person.SCHEMA$);
>>
>>     Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
>> tmpDir.getDefaultConfiguration());
>>     PCollection<Person> genericCollection = pipeline.read(
>>         new AvroParquetFileSource<Person>(new
>> Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
>>     File outputFile = tmpDir.getFile("output");
>>     Target parquetFileTarget = new
>> AvroParquetFileTarget(outputFile.getAbsolutePath());
>>     pipeline.write(genericCollection, parquetFileTarget);
>>     pipeline.run();
>>
>>     Person person = genericCollection.materialize().iterator().next();
>>
>>     PCollection<Person> persistedCollection = pipeline.read(
>>         new AvroParquetFileSource<Person>(new
>> Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
>>     Person persistedPerson =
>> persistedCollection.materialize().iterator().next();
>>
>>     Path parquetFile = new Path(new File(outputFile,
>> "part-m-00000.parquet").getPath());
>>
>>     AvroParquetReader<Person> reader = new
>> AvroParquetReader<Person>(parquetFile);
>>
>>     try {
>>       Person readPerson = reader.read();
>>       assertThat(readPerson, is(person));
>>       assertThat(readPerson, is(persistedPerson));
>>     } finally {
>>       reader.close();
>>     }
>>   }
>>
>> The tests passes without any issues.  There have been an number of fixes
>> since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
>> version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
>> If it does still exist a junit/integration test would be helpful to debug
>> this issue.
>>
>>
>> [1] -
>> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
>>
>>
>> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <stoffe@gmail.com> wrote:
>>>
>>> Hi
>>>
>>> Im trying to read and write data using the avro+parquet combo that
>>> ships with crunch 0.8.0-cdh4.3.0.
>>>
>>> - The writer job looks like this.
>>>
>>> PCollection<String> lines = ...
>>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
>>>   @Override
>>>   public void process(String input, Emitter<User> emitter) {
>>>     User user = User.newBuilder().setName(input).build();
>>>     emitter.emit(user);
>>>   }
>>> }, Avros.records(User.class));
>>>
>>> AvroParquetFileSourceTarget fout = new
>>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
>>> pipeline.write(p, fout);
>>>
>>> - The reader job looks like this.
>>>
>>> AvroParquetFileSource<User> file = new
>>> AvroParquetFileSource<User>(out, Avros.records(User.class));
>>> PCollection<User> users = pipeline.read(file);
>>> // this line fails with a ClassCastException
>>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>()
{
>>>  @Override
>>>  public void process(User user, Emitter<String> emitter) {
>>>     emitter.emit(user.getName().toString());
>>>   }
>>> }, Writables.strings());
>>>
>>>
>>> However, the reader fails with a java.lang.ClassCastException? Is this
>>> a know issue or am I doing something wrong?
>>>
>>> Cheers,
>>> -Kristoffer
>>>
>>>
>>> java.lang.ClassCastException:
>>> org.apache.avro.generic.GenericData$Record cannot be cast to
>>> mapred.jobs.User
>>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
>>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>> at
>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34)
>>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:396)
>>
>>

Mime
View raw message