beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jeremie Lenfant-Engelmann (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource
Date Wed, 11 Jan 2017 22:30:17 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15819386#comment-15819386
] 

Jeremie Lenfant-Engelmann commented on BEAM-1255:
-------------------------------------------------

The issue here is that TypeDescriptor (which relies on TypeToken) is not serializable when
a variable type is specified (the <OutputT, CheckpointMarkT> part).

The simplest path is to drop the variable type part:

Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
    (Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>>)
      SerializableCoder.of(new TypeDescriptor<UnboundedSource>() {});

You're losing some information at the TypeDescriptor level about the type parameters but you
still get the fact that you are encoding an UnboundedSource.

You could extend the SerializableCoder and override the getEncodedTypeDescriptor and try to
return the UnboundedSource with
the variable type defined, but it would become pretty complicated, it'd become something like:

@Override
public TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>> getEncodedTypeDescriptor()
{
  return TypeDescriptor.of(getRecordType())
      .where(new TypeParameter<OutputT>() {}, outputTTypeDescriptor) // You need a TypeDescriptor
for OutputT
      .where(new TypeParameter<CheckpointMarkT>() {}, checkpointMarkTTypeDescriptor);
// You need a TypeDescriptor for CheckpointMarkT
}

You would need type descriptors for your type parameters (which themselves might not be serializable),
or you could create a type descriptor from the generic type itself,  but it wouldn't be really
useful since it'd say that it's an OutputT for example...

So I think the first solution is simpler...

> java.io.NotSerializableException in flink on UnboundedSource
> ------------------------------------------------------------
>
>                 Key: BEAM-1255
>                 URL: https://issues.apache.org/jira/browse/BEAM-1255
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 0.5.0
>            Reporter: Alexey Diomin
>            Assignee: Maximilian Michels
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: sun.reflect.generics.reflectiveObjects.TypeVariableImpl
> 	- element of array (index: 0)
> 	- array (class "[Ljava.lang.Object;", size: 2)
> 	- field (class "com.google.common.collect.ImmutableList$SerializedForm", name: "elements",
type: "class [Ljava.lang.Object;")
> 	- object (class "com.google.common.collect.ImmutableList$SerializedForm", com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
> 	- field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", name: "argumentsList",
type: "class com.google.common.collect.ImmutableList")
> 	- object (class "com.google.common.reflect.Types$ParameterizedTypeImpl", org.apache.beam.sdk.io.UnboundedSource<OutputT,
CheckpointMarkT>)
> 	- field (class "com.google.common.reflect.TypeToken", name: "runtimeType", type: "interface
java.lang.reflect.Type")
> 	- object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", org.apache.beam.sdk.io.UnboundedSource<OutputT,
CheckpointMarkT>)
> 	- field (class "org.apache.beam.sdk.values.TypeDescriptor", name: "token", type: "class
com.google.common.reflect.TypeToken")
> 	- object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
> 	- field (class "org.apache.beam.sdk.coders.SerializableCoder", name: "typeDescriptor",
type: "class org.apache.beam.sdk.values.TypeDescriptor")
> 	- object (class "org.apache.beam.sdk.coders.SerializableCoder", SerializableCoder)
> 	- field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", type: "interface
org.apache.beam.sdk.coders.Coder")
> 	- object (class "org.apache.beam.sdk.coders.KvCoder", KvCoder(SerializableCoder,AvroCoder))
> 	- field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: "elementCoder",
type: "interface org.apache.beam.sdk.coders.Coder")
> 	- object (class "org.apache.beam.sdk.coders.ListCoder", ListCoder(KvCoder(SerializableCoder,AvroCoder)))
> 	- field (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
> 	- root object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import org.apache.beam.runners.flink.FlinkPipelineOptions;
> import org.apache.beam.runners.flink.FlinkRunner;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.kafka.KafkaIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class FlinkSerialisationError {
>     public static void main(String[] args) {
>         FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
>         options.setRunner(FlinkRunner.class);
>         options.setStreaming(true);
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline.apply(
>                 KafkaIO.read()
>                     .withBootstrapServers("localhost:9092")
>                     .withTopics(ImmutableList.of("test"))
>                     // set ConsumerGroup
>                     .withoutMetadata());
>         pipeline.run();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message