beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: Enriching stream messages based on external data
Date Fri, 02 Jun 2017 16:19:58 GMT
If a DoFn needs information from PipelineOptions, it should really get them
from the runtime context (StartBundleContext, FinishBundleContext,
ProcessContext) with getPipelineOptions.
PipelineOptions is specifically designed in this way to prevent users from
relying on their own serialized version and missing out on:
* Value provider / template integration
* Runner provided information (credentials/...)
* Execution environment specific information (logging/host information)




On Thu, Jun 1, 2017 at 8:37 PM, Gwilym Evans <gwilym.evans@bigcommerce.com>
wrote:

> Thanks for both of your assistance
>
> It turned out that a closer examination of the stack trace above revealed
> the true source:
>
> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options.
> ProxyInvocationHandler
>
> It turns out the "options" class for beam cannot be serialized.
>
> Moving this particular DoFn out to its own class let me inject the needed
> serializable configs rather than passing out the options, and now I'm back
> on track.
>
>
> On 1 June 2017 at 22:48, Gwilym Evans <gwilym.evans@bigcommerce.com>
> wrote:
>
>> Csabi, I will try that, thank you.
>>
>> Eugene, sorry I should have mentioned that I've already tried that and it
>> still fails. I've also tried annotating it with @JsonIgnore. Thanks, though.
>>
>> On 1 June 2017 at 22:46, Eugene Kirpichov <kirpichov@google.com> wrote:
>>
>>> It's probably because of the BigtableSession variable - mark it
>>> transient.
>>>
>>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <csaba.kassai@doctusoft.com>
>>> wrote:
>>>
>>>> Hi Gwilym,
>>>>
>>>> try to extract the DoFn into a separate static inner class or into a
>>>> separate file as a top level class, instead of declaring as an
>>>> anonymous inner class. In java the anonymous inner class has an
>>>> implicit reference to the outer enclosing class, and I suspect that the
>>>> serialiser is not able the serialise the fields of this enclosing instance.
>>>>
>>>> Regards,
>>>> Csabi
>>>>
>>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <gwilym.evans@bigcommerce.com>
>>>> wrote:
>>>>
>>>>> Here's what I have in my history, if you need the "... X more"
>>>>> expanded I can look into that:
>>>>>
>>>>> 2017-06-01 05:23:05 INFO  DataflowPipelineOptions$StagingLocationFactory:127
>>>>> - No stagingLocation provided, falling back to gcpTempLocation
>>>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>>>> PipelineOptions.filesToStage was not specified. Defaulting to files from
>>>>> the classpath: will stage 111 files. Enable logging at DEBUG level to
see
>>>>> which files will be staged.
>>>>> [WARNING]
>>>>> java.lang.reflect.InvocationTargetException
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>> ssorImpl.java:62)
>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>> thodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>>> er$2@604b2279
>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>>> ray(SerializableUtils.java:53)
>>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl
>>>>> eUtils.java:90)
>>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(Par
>>>>> Do.java:569)
>>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>>> er.main(AthenaPubsubOrderNotificationsHandler.java:138)
>>>>> ... 6 more
>>>>> Caused by: java.io.NotSerializableException:
>>>>> org.apache.beam.sdk.options.ProxyInvocationHandler
>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>>> ava:1184)
>>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>>> ream.java:1548)
>>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>>> m.java:1509)
>>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>>> tream.java:1432)
>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>>> ava:1178)
>>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>>> ream.java:1548)
>>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>>> m.java:1509)
>>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>>> tream.java:1432)
>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>>> ava:1178)
>>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>>> ray(SerializableUtils.java:49)
>>>>> ... 10 more
>>>>>
>>>>> The way I'm trying to use this in the ParDo/DoFn is:
>>>>>
>>>>> (line 138 starts here)
>>>>>                 .apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
>>>>> ByteString>>() {
>>>>>                     private BigtableSession session;
>>>>>
>>>>>                     @Setup
>>>>>                     public void setUp() throws IOException {
>>>>>                         BigtableOptions opts = new
>>>>> BigtableOptions.Builder()
>>>>>                                 .setProjectId(options.getSourc
>>>>> eProjectId().get())
>>>>>                                 .setInstanceId(options.getSour
>>>>> ceInstanceId().get())
>>>>>                                 .build();
>>>>>
>>>>>                         session = new BigtableSession(opts);
>>>>>                     }
>>>>>                     ...
>>>>>
>>>>> With a ReadRowsRequest built up during the processElement and executed
>>>>> like:
>>>>>
>>>>>                     ResultScanner<FlatRow> results =
>>>>> session.getDataClient().readFlatRows(request.build());
>>>>>
>>>>> Thanks,
>>>>> Gwilym
>>>>>
>>>>>
>>>>> On 1 June 2017 at 15:10, Lukasz Cwik <lcwik@google.com> wrote:
>>>>>
>>>>>> Combining PubSub + Bigtable is common.
>>>>>>
>>>>>> You should try to use the BigtableSession approach because the hbase
>>>>>> approach adds a lot of dependencies (leading to dependency conflicts).
>>>>>> You should use the same version of Bigtable libraries that Apache
>>>>>> Beam is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>>>>>>
>>>>>> Can you provide the full stack trace for the exception your seeing?
>>>>>>
>>>>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
>>>>>> gwilym.evans@bigcommerce.com> wrote:
>>>>>>
>>>>>>> Hi list,
>>>>>>>
>>>>>>> I'm trying to figure out if Beam is intended to do the following
>>>>>>> and, if so, what's the best approach?
>>>>>>>
>>>>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively
new
>>>>>>> to Java, so if there's any known solution for this a code example
would be
>>>>>>> greatly appreciated.
>>>>>>>
>>>>>>> I have an unbound stream of short messages (coming from PubSub).
>>>>>>>
>>>>>>> For each message, I want to get a number of rows from an external
>>>>>>> database (rows within Bigtable, always the same table) based
on the
>>>>>>> contents of the message, and use the rows when producing output
for the
>>>>>>> final write apply.
>>>>>>>
>>>>>>> I've tried various means of connecting out to Bigtable from within
>>>>>>> the DoFn which is handling the PubSub inputs, but so far everything
I've
>>>>>>> tried has resulted in Beam refusing to run the job due to:
>>>>>>>
>>>>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>>>>>>> oxyInvocationHandler
>>>>>>>
>>>>>>> (methods I've tried: manually using a BigtableSession, manually
>>>>>>> using the Bigtable HBase libs)
>>>>>>>
>>>>>>> So is this something that Beam was designed to do? If so, what's
the
>>>>>>> recommended approach?
>>>>>>>
>>>>>>> I considered dynamically constructing a PCollection, but I wasn't
>>>>>>> sure if that would make use of connection pooling to Bigtable.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Gwilym
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>
>

Mime
View raw message