beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Michels <...@apache.org>
Subject Re: Flink Runner logging FAILED_TO_UNCOMPRESS
Date Tue, 17 Sep 2019 18:33:29 GMT
+dev

The beam_fn_api flag and the way it is automatically set is error-prone. 
Is there anything that prevents us from removing it? I understand that 
some Runners, e.g. Dataflow Runner have two modes of executing Python 
pipelines (legacy and portable), but at this point it seems clear that 
the portability mode should be the default.

Cheers,
Max

On September 14, 2019 7:50:52 PM PDT, Yu Watanabe 
<yu.w.tennis@gmail.com> wrote:

    Kyle

    Thank you for the assistance.

    By specifying "experiments" in PipelineOptions ,
    ==========================================
             options = PipelineOptions([
                           "--runner=FlinkRunner",
                           "--flink_version=1.8",
                           "--flink_master_url=localhost:8081",
                           "--experiments=beam_fn_api"
                       ])
    ==========================================

    I was able to submit the job successfully.

    [grpc-default-executor-0] INFO
    org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
    BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
    [grpc-default-executor-0] INFO
    org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation -
    Starting job invocation
    BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
    [flink-runner-job-invoker] INFO
    org.apache.beam.runners.flink.FlinkPipelineRunner - Translating
    pipeline to Flink program.
    [flink-runner-job-invoker] INFO
    org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating
    a Batch Execution Environment.
    [flink-runner-job-invoker] INFO
    org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using
    Flink Master URL localhost:8081.
    [flink-runner-job-invoker] WARN
    org.apache.beam.runners.flink.FlinkExecutionEnvironments - No
    default parallelism could be found. Defaulting to parallelism 1.
    Please set an explicit parallelism with --parallelism
    [flink-runner-job-invoker] INFO
    org.apache.flink.api.java.ExecutionEnvironment - The job has 0
    registered types and 0 default Kryo serializers
    [flink-runner-job-invoker] INFO
    org.apache.flink.configuration.Configuration - Config uses fallback
    configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
    [flink-runner-job-invoker] INFO
    org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
    [flink-runner-job-invoker] INFO
    org.apache.flink.client.program.rest.RestClusterClient - Submitting
    job 4e055a8878dda3f564a7b7c84d48510d (detached: false).

    Thanks,
    Yu Watanabe

    On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver <kcweaver@google.com
    <mailto:kcweaver@google.com>> wrote:

        Try adding "--experiments=beam_fn_api" to your pipeline options.
        (This is a known issue with Beam 2.15 that will be fixed in 2.16.)

        Kyle Weaver | Software Engineer | github.com/ibzib
        <http://github.com/ibzib> | kcweaver@google.com
        <mailto:kcweaver@google.com>


        On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe
        <yu.w.tennis@gmail.com <mailto:yu.w.tennis@gmail.com>> wrote:

            Hello.

            I am trying to spin up the flink runner but looks like data
            serialization is failing.
            I would like to ask for help to get over with this error.

            ========================================================================
            [flink-runner-job-invoker] ERROR
            org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation
            - Error during job invocation
            BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
            java.lang.IllegalArgumentException: unable to deserialize
            BoundedSource
                     at
            org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
                     at
            org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
                     at
            org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
                     at
            org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
                     at
            org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
                     at
            org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
                     at
            org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
                     at
            org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
                     at
            org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
                     at
            org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
                     at
            org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
                     at
            java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
            ywatanabe@debian-09-00:~$
                     at
            java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                     at java.lang.Thread.run(Thread.java:748)
            Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
                     at
            org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
                     at
            org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
                     at
            org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
                     at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
                     at
            org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
                     at
            org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
                     at
            org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59)
                     at
            org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
                     ... 13 more
            ========================================================================

            My beam version is below.

            =======================================================================
            (python) ywatanabe@debian-09-00:~$ pip3 freeze | grep
            apache-beam
            apache-beam==2.15.0
            =======================================================================

            I have my harness container ready on  the registry.

            =======================================================================
            ywatanabe@debian-09-00:~$ docker search
            ywatanabe-docker-apache.bintray.io/python3
            <http://ywatanabe-docker-apache.bintray.io/python3>
            NAME                DESCRIPTION         STARS          
            OFFICIAL            AUTOMATED
            beam/python3                            0
            =======================================================================

            Flink is ready on separate cluster.

            =======================================================================
            (python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
            tcp    LISTEN     0      128      :::8081         :::*
            =======================================================================


            My debian version.

            =======================================================================

            (python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
            9.11
            =======================================================================


            My code snippet is below.

            =======================================================================

                 options = PipelineOptions([
                               "--runner=FlinkRunner",
                               "--flink_version=1.8",
            "--flink_master_url=localhost:8081"
                           ])

                 with beam.Pipeline(options=options) as p:

                     (p | beam.Create(["Hello World"]))
            =======================================================================


            Would there be any other settings should I look for ?

            Thanks,
            Yu Watanabe

            -- 
            Yu Watanabe
            Weekend Freelancer who loves to challenge building data
            platform
            yu.w.tennis@gmail.com <mailto:yu.w.tennis@gmail.com>
            LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1>
            Twitter icon <https://twitter.com/yuwtennis>



    -- 
    Yu Watanabe
    Weekend Freelancer who loves to challenge building data platform
    yu.w.tennis@gmail.com <mailto:yu.w.tennis@gmail.com>
    LinkedIn icon <https://www.linkedin.com/in/yuwatanabe1> Twitter icon
    <https://twitter.com/yuwtennis>


Mime
View raw message