beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwilym Evans <gwilym.ev...@bigcommerce.com>
Subject Re: Enriching stream messages based on external data
Date Fri, 02 Jun 2017 23:58:05 GMT
Very nice to know, thank you

For what it's worth, my job is now up and running perfectly

Thank you all again

On 2 June 2017 at 16:19, Lukasz Cwik <lcwik@google.com> wrote:

> If a DoFn needs information from PipelineOptions, it should really get
> them from the runtime context (StartBundleContext, FinishBundleContext,
> ProcessContext) with getPipelineOptions.
> PipelineOptions is specifically designed in this way to prevent users from
> relying on their own serialized version and missing out on:
> * Value provider / template integration
> * Runner provided information (credentials/...)
> * Execution environment specific information (logging/host information)
>
>
>
>
> On Thu, Jun 1, 2017 at 8:37 PM, Gwilym Evans <gwilym.evans@bigcommerce.com
> > wrote:
>
>> Thanks for both of your assistance
>>
>> It turned out that a closer examination of the stack trace above revealed
>> the true source:
>>
>> Caused by: java.io.NotSerializableException: org.apache.beam.sdk.options.
>> ProxyInvocationHandler
>>
>> It turns out the "options" class for beam cannot be serialized.
>>
>> Moving this particular DoFn out to its own class let me inject the needed
>> serializable configs rather than passing out the options, and now I'm back
>> on track.
>>
>>
>> On 1 June 2017 at 22:48, Gwilym Evans <gwilym.evans@bigcommerce.com>
>> wrote:
>>
>>> Csabi, I will try that, thank you.
>>>
>>> Eugene, sorry I should have mentioned that I've already tried that and
>>> it still fails. I've also tried annotating it with @JsonIgnore. Thanks,
>>> though.
>>>
>>> On 1 June 2017 at 22:46, Eugene Kirpichov <kirpichov@google.com> wrote:
>>>
>>>> It's probably because of the BigtableSession variable - mark it
>>>> transient.
>>>>
>>>> On Thu, Jun 1, 2017 at 3:33 PM Csaba Kassai <csaba.kassai@doctusoft.com>
>>>> wrote:
>>>>
>>>>> Hi Gwilym,
>>>>>
>>>>> try to extract the DoFn into a separate static inner class or into a
>>>>> separate file as a top level class, instead of declaring as an
>>>>> anonymous inner class. In java the anonymous inner class has an
>>>>> implicit reference to the outer enclosing class, and I suspect that the
>>>>> serialiser is not able the serialise the fields of this enclosing instance.
>>>>>
>>>>> Regards,
>>>>> Csabi
>>>>>
>>>>> On Thu, 1 Jun 2017 at 23:23 Gwilym Evans <gwilym.evans@bigcommerce.com>
>>>>> wrote:
>>>>>
>>>>>> Here's what I have in my history, if you need the "... X more"
>>>>>> expanded I can look into that:
>>>>>>
>>>>>> 2017-06-01 05:23:05 INFO  DataflowPipelineOptions$StagingLocationFactory:127
>>>>>> - No stagingLocation provided, falling back to gcpTempLocation
>>>>>> 2017-06-01 05:23:06 INFO  DataflowRunner:229 -
>>>>>> PipelineOptions.filesToStage was not specified. Defaulting to files
from
>>>>>> the classpath: will stage 111 files. Enable logging at DEBUG level
to see
>>>>>> which files will be staged.
>>>>>> [WARNING]
>>>>>> java.lang.reflect.InvocationTargetException
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>> ssorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>> thodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.IllegalArgumentException: unable to serialize
>>>>>> org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>>>> er$2@604b2279
>>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>>>> ray(SerializableUtils.java:53)
>>>>>> at org.apache.beam.sdk.util.SerializableUtils.clone(Serializabl
>>>>>> eUtils.java:90)
>>>>>> at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(Par
>>>>>> Do.java:569)
>>>>>> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
>>>>>> at org.apache.beam.examples.AthenaPubsubOrderNotificationsHandl
>>>>>> er.main(AthenaPubsubOrderNotificationsHandler.java:138)
>>>>>> ... 6 more
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.apache.beam.sdk.options.ProxyInvocationHandler
>>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>>>> ava:1184)
>>>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>>>> ream.java:1548)
>>>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>>>> m.java:1509)
>>>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>>>> tream.java:1432)
>>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>>>> ava:1178)
>>>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>>>> ream.java:1548)
>>>>>> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>>>>>> m.java:1509)
>>>>>> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>>>>>> tream.java:1432)
>>>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>>>> ava:1178)
>>>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.ja
>>>>>> va:348)
>>>>>> at org.apache.beam.sdk.util.SerializableUtils.serializeToByteAr
>>>>>> ray(SerializableUtils.java:49)
>>>>>> ... 10 more
>>>>>>
>>>>>> The way I'm trying to use this in the ParDo/DoFn is:
>>>>>>
>>>>>> (line 138 starts here)
>>>>>>                 .apply(ParDo.of(new DoFn<PubsubMessage, Map<String,
>>>>>> ByteString>>() {
>>>>>>                     private BigtableSession session;
>>>>>>
>>>>>>                     @Setup
>>>>>>                     public void setUp() throws IOException {
>>>>>>                         BigtableOptions opts = new
>>>>>> BigtableOptions.Builder()
>>>>>>                                 .setProjectId(options.getSourc
>>>>>> eProjectId().get())
>>>>>>                                 .setInstanceId(options.getSour
>>>>>> ceInstanceId().get())
>>>>>>                                 .build();
>>>>>>
>>>>>>                         session = new BigtableSession(opts);
>>>>>>                     }
>>>>>>                     ...
>>>>>>
>>>>>> With a ReadRowsRequest built up during the processElement and
>>>>>> executed like:
>>>>>>
>>>>>>                     ResultScanner<FlatRow> results =
>>>>>> session.getDataClient().readFlatRows(request.build());
>>>>>>
>>>>>> Thanks,
>>>>>> Gwilym
>>>>>>
>>>>>>
>>>>>> On 1 June 2017 at 15:10, Lukasz Cwik <lcwik@google.com> wrote:
>>>>>>
>>>>>>> Combining PubSub + Bigtable is common.
>>>>>>>
>>>>>>> You should try to use the BigtableSession approach because the
hbase
>>>>>>> approach adds a lot of dependencies (leading to dependency conflicts).
>>>>>>> You should use the same version of Bigtable libraries that Apache
>>>>>>> Beam is using (Apache Beam 2.0.0 uses Bigtable 0.9.6.2).
>>>>>>>
>>>>>>> Can you provide the full stack trace for the exception your seeing?
>>>>>>>
>>>>>>> On Wed, May 31, 2017 at 10:51 PM, Gwilym Evans <
>>>>>>> gwilym.evans@bigcommerce.com> wrote:
>>>>>>>
>>>>>>>> Hi list,
>>>>>>>>
>>>>>>>> I'm trying to figure out if Beam is intended to do the following
>>>>>>>> and, if so, what's the best approach?
>>>>>>>>
>>>>>>>> I'm using Java, Beam 2.0.0 on GCP Dataflow. Note: I'm relatively
>>>>>>>> new to Java, so if there's any known solution for this a
code example would
>>>>>>>> be greatly appreciated.
>>>>>>>>
>>>>>>>> I have an unbound stream of short messages (coming from PubSub).
>>>>>>>>
>>>>>>>> For each message, I want to get a number of rows from an
external
>>>>>>>> database (rows within Bigtable, always the same table) based
on the
>>>>>>>> contents of the message, and use the rows when producing
output for the
>>>>>>>> final write apply.
>>>>>>>>
>>>>>>>> I've tried various means of connecting out to Bigtable from
within
>>>>>>>> the DoFn which is handling the PubSub inputs, but so far
everything I've
>>>>>>>> tried has resulted in Beam refusing to run the job due to:
>>>>>>>>
>>>>>>>> java.io.NotSerializableException: org.apache.beam.sdk.options.Pr
>>>>>>>> oxyInvocationHandler
>>>>>>>>
>>>>>>>> (methods I've tried: manually using a BigtableSession, manually
>>>>>>>> using the Bigtable HBase libs)
>>>>>>>>
>>>>>>>> So is this something that Beam was designed to do? If so,
what's
>>>>>>>> the recommended approach?
>>>>>>>>
>>>>>>>> I considered dynamically constructing a PCollection, but
I wasn't
>>>>>>>> sure if that would make use of connection pooling to Bigtable.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Gwilym
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>
>>
>

Mime
View raw message