beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: Beam and ParquetIO
Date Mon, 27 Aug 2018 16:10:28 GMT
I'm not sure. You will need to take a closer look at the Parquet
documentation to verify what it can do.

Apache Beam's integration seems to be a pretty thin adapter
over org.apache.parquet:parquet-avro:1.10.0 so it should be easy to see how
the two interact and add any missing integration points between the adapter
and the backing implementation.

On Mon, Aug 27, 2018 at 12:09 AM Kelsey RIDER <kelsey.rider@ineat-conseil.fr>
wrote:

> I discovered that it works if I build the schema, and change the namespace.
>
> In my example, the automatically-generated schema produces the namespace
> “pva.beam.Main$” and name “TP”.
>
> Apparently, when reading, if Avro can find the Class from the schema’s
> namespace + name, it will instantiate that instead of using GenericRecord.
> Or is there a constraint on the sorts of objects that can be stored – do
> they have to implement GenericRecord? (If so, is this documented somewhere?)
>
>
>
>
>
> *From:* Lukasz Cwik <lcwik@google.com>
> *Sent:* vendredi 24 août 2018 22:04
> *To:* user@beam.apache.org
> *Subject:* Re: Beam and ParquetIO
>
>
>
> Does it work if you define the schema manually instead of using
> ReflectData?
>
>
>
> All the Parquet tests[1] seem to explicitly build the schema and this
> could be a current limitation of the ParquetIO implementation. I would need
> someone with more Parquet knowledge related to the implementation though to
> speak up.
>
>
>
> 1:
> https://github.com/apache/beam/blob/88b3556ad99e16b3a63064d7243d1675df501ef5/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java#L97
>
>
>
> On Thu, Aug 23, 2018 at 11:57 PM Kelsey RIDER <
> kelsey.rider@ineat-conseil.fr> wrote:
>
> Hi,
>
>
>
> Sorry, missed that bit of code – the SchemaConverter does exactly the same
> thing, so they are indeed both created in the same way.
>
>
>
> *From:* Lukasz Cwik <lcwik@google.com>
> *Sent:* vendredi 24 août 2018 01:15
> *To:* user@beam.apache.org
> *Subject:* Re: Beam and ParquetIO
>
>
>
> I believe you need to be consistent in how you are creating your schema
> since
>
> Schema tpSchema = ReflectData.get().getSchema(TP.class)   !=   new
> SchemaConverter().convert(TP.class);
>
>
>
> Either use Schema tpSchema = ReflectData.get().getSchema(TP.class)  OR
>  new SchemaConverter().convert(TP.class);
>
>
>
>
>
> On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER <
> kelsey.rider@ineat-conseil.fr> wrote:
>
> Hello,
>
>
>
> I’m trying to use ParquetIO to both read and write records.
>
> However, I’m unable to read back the records that were previously written.
>
> I wrote a simple test:
>
>
>
> private static class TP implements Serializable {
>
>                public String str;
>
> }
>
>
>
> private void testWriteParquet(String[] args) {
>
>                final PVAOptions options = getOptions(args);
>
>
>
>                final Pipeline pipeline = Pipeline.create(options);
>
>
>
>                Schema tpSchema = ReflectData.get().getSchema(TP.class);
>
>                GenericData.Record record = new
> GenericData.Record(tpSchema);
>
>                record.put("str", "All your data are belong to us");
>
>
>
>
> pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class,
> AvroCoder.of(tpSchema));
>
>
>
>                pipeline.apply(Create.of((GenericRecord)record))
>
>
> .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory()
> + "parquettest"));
>
>
>
>                pipeline.run();
>
> }
>
>
>
> private void testReadParquet(String[] args) {
>
>                final PVAOptions options = getOptions(args);
>
>
>
>                final Pipeline pipeline = Pipeline.create(options);
>
>
>
>                Schema tpSchema = new SchemaConverter().convert(TP.class);
>
>
>
>
> pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory()
> + "parquettest/*"))
>
>                               .apply(ParDo.of(new DoFn<GenericRecord,
> Void>() {
>
>                                             @ProcessElement
>
>                                             public void pe(ProcessContext
> c, @Element GenericRecord record) {
>
>
> System.out.println("----------------------------------- found record str =
> " + record.get("str"));
>
>                                             }
>
>                               }));
>
>
>
>                pipeline.run();
>
> }
>
>
>
>
>
> The result I get is:
>
>
>
> java.lang.reflect.InvocationTargetException
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*)
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke (
> *NativeMethodAccessorImpl.java:62*)
>
>     *at* sun.reflect.DelegatingMethodAccessorImpl.invoke (
> *DelegatingMethodAccessorImpl.java:43*)
>
>     *at* java.lang.reflect.Method.invoke (*Method.java:498*)
>
>     *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run (
> *ExecJavaMojo.java:294*)
>
>     *at* java.lang.Thread.run (*Thread.java:748*)
>
> *Caused by*: org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.parquet.io.ParquetDecodingException:
> Can not read value at 1 in block 0 in file
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
>
>     *at*
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (*DirectRunner.java:349*)
>
>     *at*
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish
> (*DirectRunner.java:319*)
>
>     *at* org.apache.beam.runners.direct.DirectRunner.run (
> *DirectRunner.java:210*)
>
>     *at* org.apache.beam.runners.direct.DirectRunner.run (
> *DirectRunner.java:66*)
>
>     *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:311*)
>
>     *at* org.apache.beam.sdk.Pipeline.run (*Pipeline.java:297*)
>
>     *at* pva.beam.Main.testReadParquet (*Main.java:67*)
>
>     *at* pva.beam.Main.main (*Main.java:75*)
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke0 (*Native Method*)
>
>     *at* sun.reflect.NativeMethodAccessorImpl.invoke (
> *NativeMethodAccessorImpl.java:62*)
>
>     *at* sun.reflect.DelegatingMethodAccessorImpl.invoke (
> *DelegatingMethodAccessorImpl.java:43*)
>
>     *at* java.lang.reflect.Method.invoke (*Method.java:498*)
>
>     *at* org.codehaus.mojo.exec.ExecJavaMojo$1.run (
> *ExecJavaMojo.java:294*)
>
>     *at* java.lang.Thread.run (*Thread.java:748*)
>
> *Caused by*: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 1 in block 0 in file
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775
>
>     *at*
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (
> *InternalParquetRecordReader.java:251*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:132*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:136*)
>
>     *at*
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (
> *ParquetIO.java:226*)
>
> *Caused by*: java.lang.ClassCastException: pva.beam.Main$TP cannot be
> cast to org.apache.avro.generic.IndexedRecord
>
>     *at* org.apache.avro.generic.GenericData.setField (
> *GenericData.java:690*)
>
>     *at* org.apache.parquet.avro.AvroRecordConverter.set (
> *AvroRecordConverter.java:393*)
>
>     *at* org.apache.parquet.avro.AvroRecordConverter$2.add (
> *AvroRecordConverter.java:136*)
>
>     *at* org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary
> (*AvroConverters.java:62*)
>
>     *at* org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue (
> *ColumnReaderImpl.java:317*)
>
>     *at*
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter
> (*ColumnReaderImpl.java:367*)
>
>     *at* org.apache.parquet.io.RecordReaderImplementation.read (
> *RecordReaderImplementation.java:406*)
>
>     *at*
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (
> *InternalParquetRecordReader.java:226*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:132*)
>
>     *at* org.apache.parquet.hadoop.ParquetReader.read (
> *ParquetReader.java:136*)
>
>     *at*
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (
> *ParquetIO.java:226*)
>
>     *at*
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement
> (*Unknown Source*)
>
>     *at*
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement
> (*SimpleDoFnRunner.java:185*)
>
>     *at*
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement
> (*SimpleDoFnRunner.java:149*)
>
>     *at*
> org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows
> (*SimplePushbackSideInputDoFnRunner.java:78*)
>
>     *at* org.apache.beam.runners.direct.ParDoEvaluator.processElement (
> *ParDoEvaluator.java:189*)
>
>     *at*
> org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement
> (*DoFnLifecycleManagerRemovingTransformEvaluator.java:55*)
>
>     *at*
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements (
> *DirectTransformExecutor.java:161*)
>
>    *at* org.apache.beam.runners.direct.DirectTransformExecutor.run (
> *DirectTransformExecutor.java:125*)
>
>     *at* java.util.concurrent.Executors$RunnableAdapter.call (
> *Executors.java:511*)
>
>     *at* java.util.concurrent.FutureTask.run (*FutureTask.java:266*)
>
>     *at* java.util.concurrent.ThreadPoolExecutor.runWorker (
> *ThreadPoolExecutor.java:1149*)
>
>     *at* java.util.concurrent.ThreadPoolExecutor$Worker.run (
> *ThreadPoolExecutor.java:624*)
>
>     *at* java.lang.Thread.run (*Thread.java:748*)
>
>
>
> How can I read back my records with Beam?
>
> *Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.*
>
>

Mime
View raw message