Return-Path: X-Original-To: apmail-crunch-user-archive@www.apache.org Delivered-To: apmail-crunch-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 94A9811687 for ; Wed, 4 Jun 2014 14:38:27 +0000 (UTC) Received: (qmail 93116 invoked by uid 500); 4 Jun 2014 14:38:27 -0000 Delivered-To: apmail-crunch-user-archive@crunch.apache.org Received: (qmail 93073 invoked by uid 500); 4 Jun 2014 14:38:27 -0000 Mailing-List: contact user-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@crunch.apache.org Delivered-To: mailing list user@crunch.apache.org Received: (qmail 93065 invoked by uid 99); 4 Jun 2014 14:38:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jun 2014 14:38:27 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of stoffe@gmail.com designates 74.125.82.180 as permitted sender) Received: from [74.125.82.180] (HELO mail-we0-f180.google.com) (74.125.82.180) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Jun 2014 14:38:23 +0000 Received: by mail-we0-f180.google.com with SMTP id q58so8714708wes.11 for ; Wed, 04 Jun 2014 07:38:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; bh=tomP/k872HJ+95eM533DkBLBv4bUx6Kl5rPts+irKKY=; b=kyYA1NfEBjiVmUEvdmniISjf/09d/uaRNAhY58dE3qlQt6e2orswq4hJfUXPz65b2/ aBKFLxTNOuALSB6zJ/YJGtzI2xYcOM7sGu+H2T7AnKHrTbtYjTd9NUsUTFJT5DARYVKz 1xg21wBaWTC0nTGPYSUohU1dpH0rMbCQjMkC0ZEle00b93YLw/1fR8NjzKgZZkhXhhG/ eUBGPuQUWLFnCqvVoMxI9oDYDceOMUsgaBdlSxpPwW7ffumxYAhuQ5bpBVZ4w2Ot//l2 LtP+rJi1ts7b6RZHwjjhxF6X3mywnrFM/LZ72rUbx8/4vtwRITBq6R6IBEKdzXMOdmx3 2Efg== MIME-Version: 1.0 X-Received: by 10.194.77.2 with SMTP id o2mr5662433wjw.68.1401892682019; Wed, 04 Jun 2014 07:38:02 -0700 (PDT) Received: by 10.194.17.195 with HTTP; Wed, 4 Jun 2014 07:38:01 -0700 (PDT) In-Reply-To: References: Date: Wed, 4 Jun 2014 16:38:01 +0200 Message-ID: Subject: Re: Avro+Parquet From: =?UTF-8?Q?Kristoffer_Sj=C3=B6gren?= To: user@crunch.apache.org Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org Sorry for the late reply. It was my fault. A refactoring changed the java package name without changing Avro schema namespace in the $SCHEMA field, which caused Avro to fallback on generic records. Works find now! On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sj=C3=B6gren = wrote: > Thanks for the quick answer! My initial test still fail, but I may > have done something wrong here. I will do a more thorough test asap. > > On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre wro= te: >> I don't believe it is a known issue. I modified an AvroParquetPipelineI= T[1] >> to verify the output to a target using a source.. >> >> @Test >> public void toAvroParquetFileTargetFromParquet() throws Exception { >> GenericRecord savedRecord =3D new GenericData.Record(Person.SCHEMA$)= ; >> savedRecord.put("name", "John Doe"); >> savedRecord.put("age", 42); >> savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"))= ; >> populateGenericParquetFile(Lists.newArrayList(savedRecord), >> Person.SCHEMA$); >> >> Pipeline pipeline =3D new MRPipeline(AvroParquetPipelineIT.class, >> tmpDir.getDefaultConfiguration()); >> PCollection genericCollection =3D pipeline.read( >> new AvroParquetFileSource(new >> Path(avroFile.getAbsolutePath()), Avros.records(Person.class))); >> File outputFile =3D tmpDir.getFile("output"); >> Target parquetFileTarget =3D new >> AvroParquetFileTarget(outputFile.getAbsolutePath()); >> pipeline.write(genericCollection, parquetFileTarget); >> pipeline.run(); >> >> Person person =3D genericCollection.materialize().iterator().next(); >> >> PCollection persistedCollection =3D pipeline.read( >> new AvroParquetFileSource(new >> Path(outputFile.getAbsolutePath()), Avros.records(Person.class))); >> Person persistedPerson =3D >> persistedCollection.materialize().iterator().next(); >> >> Path parquetFile =3D new Path(new File(outputFile, >> "part-m-00000.parquet").getPath()); >> >> AvroParquetReader reader =3D new >> AvroParquetReader(parquetFile); >> >> try { >> Person readPerson =3D reader.read(); >> assertThat(readPerson, is(person)); >> assertThat(readPerson, is(persistedPerson)); >> } finally { >> reader.close(); >> } >> } >> >> The tests passes without any issues. There have been an number of fixes >> since the 0.8.0-cdh4.3.0 version. You might try upgrading to the latest >> version available (0.8.2+71-cdh4.6.0) and see if the problem still exist= s. >> If it does still exist a junit/integration test would be helpful to debu= g >> this issue. >> >> >> [1] - >> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e82= 1c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipe= lineIT.java#L120 >> >> >> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sj=C3=B6gren wrote: >>> >>> Hi >>> >>> Im trying to read and write data using the avro+parquet combo that >>> ships with crunch 0.8.0-cdh4.3.0. >>> >>> - The writer job looks like this. >>> >>> PCollection lines =3D ... >>> PCollection p =3D lines.parallelDo(new DoFn() { >>> @Override >>> public void process(String input, Emitter emitter) { >>> User user =3D User.newBuilder().setName(input).build(); >>> emitter.emit(user); >>> } >>> }, Avros.records(User.class)); >>> >>> AvroParquetFileSourceTarget fout =3D new >>> AvroParquetFileSourceTarget(out, Avros.records(User.class)); >>> pipeline.write(p, fout); >>> >>> - The reader job looks like this. >>> >>> AvroParquetFileSource file =3D new >>> AvroParquetFileSource(out, Avros.records(User.class)); >>> PCollection users =3D pipeline.read(file); >>> // this line fails with a ClassCastException >>> PCollection lines =3D users.parallelDo(new DoFn()= { >>> @Override >>> public void process(User user, Emitter emitter) { >>> emitter.emit(user.getName().toString()); >>> } >>> }, Writables.strings()); >>> >>> >>> However, the reader fails with a java.lang.ClassCastException? Is this >>> a know issue or am I doing something wrong? >>> >>> Cheers, >>> -Kristoffer >>> >>> >>> java.lang.ClassCastException: >>> org.apache.avro.generic.GenericData$Record cannot be cast to >>> mapred.jobs.User >>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >>> at >>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmi= tter.java:56) >>> at org.apache.crunch.MapFn.process(MapFn.java:34) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99) >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110) >>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60) >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140) >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330) >>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:396) >> >>