beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kelsey RIDER <kelsey.ri...@ineat-conseil.fr>
Subject RE: Beam and ParquetIO
Date Mon, 27 Aug 2018 07:08:56 GMT
I discovered that it works if I build the schema, and change the namespace.
In my example, the automatically-generated schema produces the namespace “pva.beam.Main$”
and name “TP”.
Apparently, when reading, if Avro can find the Class from the schema’s namespace + name,
it will instantiate that instead of using GenericRecord. Or is there a constraint on the sorts
of objects that can be stored – do they have to implement GenericRecord? (If so, is this
documented somewhere?)


From: Lukasz Cwik <lcwik@google.com>
Sent: vendredi 24 août 2018 22:04
To: user@beam.apache.org
Subject: Re: Beam and ParquetIO

Does it work if you define the schema manually instead of using ReflectData?

All the Parquet tests[1] seem to explicitly build the schema and this could be a current limitation
of the ParquetIO implementation. I would need someone with more Parquet knowledge related
to the implementation though to speak up.

1: https://github.com/apache/beam/blob/88b3556ad99e16b3a63064d7243d1675df501ef5/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java#L97

On Thu, Aug 23, 2018 at 11:57 PM Kelsey RIDER <kelsey.rider@ineat-conseil.fr<mailto:kelsey.rider@ineat-conseil.fr>>
wrote:
Hi,

Sorry, missed that bit of code – the SchemaConverter does exactly the same thing, so they
are indeed both created in the same way.

From: Lukasz Cwik <lcwik@google.com<mailto:lcwik@google.com>>
Sent: vendredi 24 août 2018 01:15
To: user@beam.apache.org<mailto:user@beam.apache.org>
Subject: Re: Beam and ParquetIO

I believe you need to be consistent in how you are creating your schema since
Schema tpSchema = ReflectData.get().getSchema(TP.class)   !=   new SchemaConverter().convert(TP.class);

Either use Schema tpSchema = ReflectData.get().getSchema(TP.class)  OR   new SchemaConverter().convert(TP.class);


On Thu, Aug 23, 2018 at 8:41 AM Kelsey RIDER <kelsey.rider@ineat-conseil.fr<mailto:kelsey.rider@ineat-conseil.fr>>
wrote:
Hello,

I’m trying to use ParquetIO to both read and write records.
However, I’m unable to read back the records that were previously written.
I wrote a simple test:

private static class TP implements Serializable {
               public String str;
}

private void testWriteParquet(String[] args) {
               final PVAOptions options = getOptions(args);

               final Pipeline pipeline = Pipeline.create(options);

               Schema tpSchema = ReflectData.get().getSchema(TP.class);
               GenericData.Record record = new GenericData.Record(tpSchema);
               record.put("str", "All your data are belong to us");

               pipeline.getCoderRegistry().registerCoderForClass(GenericData.Record.class,
AvroCoder.of(tpSchema));

               pipeline.apply(Create.of((GenericRecord)record))
                              .apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(tpSchema)).to(options.getDestinationDirectory()
+ "parquettest"));

               pipeline.run();
}

private void testReadParquet(String[] args) {
               final PVAOptions options = getOptions(args);

               final Pipeline pipeline = Pipeline.create(options);

               Schema tpSchema = new SchemaConverter().convert(TP.class);

               pipeline.apply(ParquetIO.read(tpSchema).from(options.getDestinationDirectory()
+ "parquettest/*"))
                              .apply(ParDo.of(new DoFn<GenericRecord, Void>() {
                                            @ProcessElement
                                            public void pe(ProcessContext c, @Element GenericRecord
record) {
                                                           System.out.println("-----------------------------------
found record str = " + record.get("str"));
                                            }
                              }));

               pipeline.run();
}


The result I get is:

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.parquet.io.ParquetDecodingException:
Can not read value at 1 in block 0 in file org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775<mailto:org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775>
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:349)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish (DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run (DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:297)
    at pva.beam.Main.testReadParquet (Main.java:67)
    at pva.beam.Main.main (Main.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block
0 in file org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775<mailto:org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$BeamParquetInputFile@2b7a4775>
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (InternalParquetRecordReader.java:251)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (ParquetIO.java:226)
Caused by: java.lang.ClassCastException: pva.beam.Main$TP cannot be cast to org.apache.avro.generic.IndexedRecord
    at org.apache.avro.generic.GenericData.setField (GenericData.java:690)
    at org.apache.parquet.avro.AvroRecordConverter.set (AvroRecordConverter.java:393)
    at org.apache.parquet.avro.AvroRecordConverter$2.add (AvroRecordConverter.java:136)
    at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary (AvroConverters.java:62)
    at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue (ColumnReaderImpl.java:317)
    at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter (ColumnReaderImpl.java:367)
    at org.apache.parquet.io.RecordReaderImplementation.read (RecordReaderImplementation.java:406)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue (InternalParquetRecordReader.java:226)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:132)
    at org.apache.parquet.hadoop.ParquetReader.read (ParquetReader.java:136)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement (ParquetIO.java:226)
    at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn$DoFnInvoker.invokeProcessElement
(Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement
(SimpleDoFnRunner.java:185)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement
(SimpleDoFnRunner.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows
(SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement (ParDoEvaluator.java:189)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement
(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements (DirectTransformExecutor.java:161)
   at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
    at java.util.concurrent.FutureTask.run (FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
    at java.lang.Thread.run (Thread.java:748)

How can I read back my records with Beam?
Suite à l’évolution des dispositifs de réglementation du travail, si vous recevez ce
mail avant 7h00, en soirée, durant le week-end ou vos congés merci, sauf cas d’urgence
exceptionnelle, de ne pas le traiter ni d’y répondre immédiatement.
Mime
View raw message