beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlos Alonso <car...@mrcalonso.com>
Subject Re: Trying to understand Unable to encode element exceptions
Date Fri, 19 Jan 2018 16:22:36 GMT
You mean replacing the Map[String, String] from the case class into a
java.util.Map<String, String>? And then, how could I set that
MapCoder<String, String> for that bit?

Sorry if those questions are too newbie, but this is my first experience
with Beam...

Thanks!

On Fri, Jan 19, 2018 at 5:19 PM Neville Li <neville.lyh@gmail.com> wrote:

> In this case it's probably easiest to map the scala `Map[K, V]` into a
> `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
> have to deal with internal coder inference.
>
>
> On Fri, Jan 19, 2018 at 11:03 AM Neville Li <neville.lyh@gmail.com> wrote:
>
>> That happens when you mix beam transforms into scio and defeats the
>> safety we have in place. Map the values into something beam-serializable
>> first or rewrite the transform with a scio built-in which takes care of
>> KvCoder.
>>
>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <carlos@mrcalonso.com>
>> wrote:
>>
>>> I'm following this example:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>
>>> because I'm building something very similar to a group into batches
>>> functionality. If I don't set the coder manually, this exception arises:
>>> https://pastebin.com/xxdDMXSf
>>>
>>> Thanks!
>>>
>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville.lyh@gmail.com>
>>> wrote:
>>>
>>>> You shouldn't manually set coder in most cases. It defaults to
>>>> KryoAtomicCoder for most Scala types.
>>>> More details:
>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>
>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <carlos@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> May it be because I’m using
>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])))
at
>>>>> some point in the pipeline
>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>> outputs a SerializableCoder)?
>>>>>
>>>>> This is something I've always wondered. How does one specify a coder
>>>>> for a case class?
>>>>>
>>>>> Regards
>>>>>
>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville.lyh@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Not sure why it falls back to SerializableCoder. Can you file an
GH
>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone!!
>>>>>>>
>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>> subscription into GCS buckets. In order to do it I'm using both
stateful
>>>>>>> and timely processing and after building and testing the project
locally I
>>>>>>> tried to run it on Google Dataflow and I started getting those
errors.
>>>>>>>
>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>
>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class
defined as
>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>
>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's
Scio as
>>>>>>> well) which may suggest that the issue is on serializing the
Map, but to be
>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>
>>>>>>> Can anyone help me, please?
>>>>>>> Thanks!
>>>>>>>
>>>>>>

Mime
View raw message