beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1255) java.io.NotSerializableException in flink on UnboundedSource
Date Thu, 12 Jan 2017 06:51:52 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15820299#comment-15820299
] 

ASF GitHub Bot commented on BEAM-1255:
--------------------------------------

GitHub user xhumanoid opened a pull request:

    https://github.com/apache/beam/pull/1770

    [BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource

    Hi, this commit for fix serialization issue on UnboundedSource.
    Without it all UnboundedSource broken in flink engine.
    
    [BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource
    fix javadoc for BoundedSourceWrapper
    
    @aljoscha @mxm Could you review and merge?


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xhumanoid/incubator-beam BEAM-1255-java.io.NotSerializableException-in-flink-on-UnboundedSource

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/1770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1770
    
----
commit f5a3d8606ca4a6ad5d0b69b7852273d9fa7cff15
Author: Alexey Diomin <diominay@gmail.com>
Date:   2017-01-12T06:44:43Z

    [BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource
    fix javadoc for BoundedSourceWrapper

----


> java.io.NotSerializableException in flink on UnboundedSource
> ------------------------------------------------------------
>
>                 Key: BEAM-1255
>                 URL: https://issues.apache.org/jira/browse/BEAM-1255
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 0.5.0
>            Reporter: Alexey Diomin
>            Assignee: Alexey Diomin
>
> After introduce new Coders with TypeDescriptor on flink runner we have issue:
> {code}
> Caused by: java.io.NotSerializableException: sun.reflect.generics.reflectiveObjects.TypeVariableImpl
> 	- element of array (index: 0)
> 	- array (class "[Ljava.lang.Object;", size: 2)
> 	- field (class "com.google.common.collect.ImmutableList$SerializedForm", name: "elements",
type: "class [Ljava.lang.Object;")
> 	- object (class "com.google.common.collect.ImmutableList$SerializedForm", com.google.common.collect.ImmutableList$SerializedForm@30af5b6b)
> 	- field (class "com.google.common.reflect.Types$ParameterizedTypeImpl", name: "argumentsList",
type: "class com.google.common.collect.ImmutableList")
> 	- object (class "com.google.common.reflect.Types$ParameterizedTypeImpl", org.apache.beam.sdk.io.UnboundedSource<OutputT,
CheckpointMarkT>)
> 	- field (class "com.google.common.reflect.TypeToken", name: "runtimeType", type: "interface
java.lang.reflect.Type")
> 	- object (class "com.google.common.reflect.TypeToken$SimpleTypeToken", org.apache.beam.sdk.io.UnboundedSource<OutputT,
CheckpointMarkT>)
> 	- field (class "org.apache.beam.sdk.values.TypeDescriptor", name: "token", type: "class
com.google.common.reflect.TypeToken")
> 	- object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper$1",
org.apache.beam.sdk.io.UnboundedSource<OutputT, CheckpointMarkT>)
> 	- field (class "org.apache.beam.sdk.coders.SerializableCoder", name: "typeDescriptor",
type: "class org.apache.beam.sdk.values.TypeDescriptor")
> 	- object (class "org.apache.beam.sdk.coders.SerializableCoder", SerializableCoder)
> 	- field (class "org.apache.beam.sdk.coders.KvCoder", name: "keyCoder", type: "interface
org.apache.beam.sdk.coders.Coder")
> 	- object (class "org.apache.beam.sdk.coders.KvCoder", KvCoder(SerializableCoder,AvroCoder))
> 	- field (class "org.apache.beam.sdk.coders.IterableLikeCoder", name: "elementCoder",
type: "interface org.apache.beam.sdk.coders.Coder")
> 	- object (class "org.apache.beam.sdk.coders.ListCoder", ListCoder(KvCoder(SerializableCoder,AvroCoder)))
> 	- field (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
name: "checkpointCoder", type: "class org.apache.beam.sdk.coders.ListCoder")
> 	- root object (class "org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper",
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@b2c5e07)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
> {code}
> bug introduced after commit:
> 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
> pull request:
> https://github.com/apache/beam/pull/1537
> Code for reproduce error
> {code}
> import com.google.common.collect.ImmutableList;
> import org.apache.beam.runners.flink.FlinkPipelineOptions;
> import org.apache.beam.runners.flink.FlinkRunner;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.kafka.KafkaIO;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class FlinkSerialisationError {
>     public static void main(String[] args) {
>         FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
>         options.setRunner(FlinkRunner.class);
>         options.setStreaming(true);
>         Pipeline pipeline = Pipeline.create(options);
>         pipeline.apply(
>                 KafkaIO.read()
>                     .withBootstrapServers("localhost:9092")
>                     .withTopics(ImmutableList.of("test"))
>                     // set ConsumerGroup
>                     .withoutMetadata());
>         pipeline.run();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message