beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwilym Evans <gwilym.ev...@bigcommerce.com>
Subject Re: Enriching stream messages based on external data
Date Fri, 02 Jun 2017 03:37:21 GMT
Thanks for both of your assistance

It turned out that a closer examination of the stack trace above revealed
the true source:

Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options.
ProxyInvocationHandler

It turns out the "options" class for beam cannot be serialized.

Moving this particular DoFn out to its own class let me inject the needed
serializable configs rather than passing out the options, and now I'm back
on track.


On 1 June 2017 at 22:48, Gwilym Evans <gwilym.evans@bigcommerce.com> wrote:

> Csabi, I will try that, thank you.
>
> Eugene, sorry I should have mentioned that I've already tried that and it
> still fails. I've also tried annotating it with @JsonIgnore. Thanks, though.
>
> On 1 June 2017 at 22:46, Eugene Kirpichov <kirpichov@google.com> wrote:
>
>> It's probably because of the BigtableSession variable - mark it transient.
>>
>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <csaba.kassai@doctusoft.com>
>> wrote:
>>
>>> Hi Gwilym,
>>>
>>> try to extract the DoFn into a separate static inner class or into a
>>> separate file as a top level class, instead of declaring as an
>>> anonymous inner class. In java the anonymous inner class has an
>>> implicit reference to the outer enclosing class, and I suspect that the
>>> serialiser is not able the serialise the fields of this enclosing instance.
>>>
>>> Regards,
>>> Csabi
>>>
>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <gwilym.evans@bigcommerce.com>
>>> wrote:
>>>
>>>> Here's what I have in my history, if you need the "... X more" expanded
>>>> I can look into that:
>>>>
>>>> 2017-06-01 05:23:05 INFO  DataflowPipelineOptions$StagingLocationFactory:127
>>>> - No stagingLocation provided, falling back to gcpTempLocation
>>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>>>> the classpath: will stage 111 files. Enable logging at DEBUG level to see
>>>> which files will be staged.
>>>> [WARNING]
>>>> java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>> er$2@604b2279
>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>> ray(SerializableUtils.java:53)
>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl
>>>> eUtils.java:90)
>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(
>>>> ParDo.java:569)
>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>> er.main(AthenaPubsubOrderNotificationsHandler.java:138)
>>>> ... 6 more
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.beam.sdk.options.ProxyInvocationHandler
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>>>> java:1184)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>> m.java:1509)
>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>> tream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>>>> java:1178)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>> m.java:1509)
>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>> tream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>>>> java:1178)
>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>> ray(SerializableUtils.java:49)
>>>> ... 10 more
>>>>
>>>> The way I'm trying to use this in the ParDo/DoFn is:
>>>>
>>>> (line 138 starts here)
>>>>                 .apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
>>>> ByteString>>() {
>>>>                     private BigtableSession session;
>>>>
>>>>                     @Setup
>>>>                     public void setUp() throws IOException {
>>>>                         BigtableOptions opts = new
>>>> BigtableOptions.Builder()
>>>>                                 .setProjectId(options.getSourc
>>>> eProjectId().get())
>>>>                                 .setInstanceId(options.getSour
>>>> ceInstanceId().get())
>>>>                                 .build();
>>>>
>>>>                         session = new BigtableSession(opts);
>>>>                     }
>>>>                     ...
>>>>
>>>> With a ReadRowsRequest built up during the processElement and executed
>>>> like:
>>>>
>>>>                     ResultScanner<FlatRow> results =
>>>> session.getDataClient().readFlatRows(request.build());
>>>>
>>>> Thanks,
>>>> Gwilym
>>>>
>>>>
>>>> On 1 June 2017 at 15:10, Lukasz Cwik <lcwik@google.com> wrote:
>>>>
>>>>> Combining PubSub + Bigtable is common.
>>>>>
>>>>> You should try to use the BigtableSession approach because the hbase
>>>>> approach adds a lot of dependencies (leading to dependency conflicts).
>>>>> You should use the same version of Bigtable libraries that Apache Beam
>>>>> is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>>>>>
>>>>> Can you provide the full stack trace for the exception your seeing?
>>>>>
>>>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
>>>>> gwilym.evans@bigcommerce.com> wrote:
>>>>>
>>>>>> Hi list,
>>>>>>
>>>>>> I'm trying to figure out if Beam is intended to do the following
and,
>>>>>> if so, what's the best approach?
>>>>>>
>>>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively
new
>>>>>> to Java, so if there's any known solution for this a code example
would be
>>>>>> greatly appreciated.
>>>>>>
>>>>>> I have an unbound stream of short messages (coming from PubSub).
>>>>>>
>>>>>> For each message, I want to get a number of rows from an external
>>>>>> database (rows within Bigtable, always the same table) based on the
>>>>>> contents of the message, and use the rows when producing output for
the
>>>>>> final write apply.
>>>>>>
>>>>>> I've tried various means of connecting out to Bigtable from within
>>>>>> the DoFn which is handling the PubSub inputs, but so far everything
I've
>>>>>> tried has resulted in Beam refusing to run the job due to:
>>>>>>
>>>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>>>>>> oxyInvocationHandler
>>>>>>
>>>>>> (methods I've tried: manually using a BigtableSession, manually using
>>>>>> the Bigtable HBase libs)
>>>>>>
>>>>>> So is this something that Beam was designed to do? If so, what's
the
>>>>>> recommended approach?
>>>>>>
>>>>>> I considered dynamically constructing a PCollection, but I wasn't
>>>>>> sure if that would make use of connection pooling to Bigtable.
>>>>>>
>>>>>> Thanks,
>>>>>> Gwilym
>>>>>>
>>>>>>
>>>>>
>>>>
>

Mime
View raw message