beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Commented] (BEAM-196) Pipeline options must be available Context in DoFn.startBundle
Date Mon, 18 Apr 2016 16:23:25 GMT


ASF GitHub Bot commented on BEAM-196:

GitHub user mxm opened a pull request:

    [BEAM-196] Pipeline options must be available Context in DoFn.startBundle

    This gets rid of the custom Java serialization code by defaulting to serialization of
the `PipelineOptions` to a byte array. So far, this has been proven the most hassle-free method
for the Flink Runner. For code reuse and avoiding multiple deserialization of the byte array,
the `SerializedPipelineOptions` class has been introduced.
    The changes also make the options accessible in the context of the `DoFn` function.

You can merge this pull request into a Git repository by running:

    $ git pull BEAM-196

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #200
commit 81577b31c2642522f7dd4ba8eba794df48a0ca56
Author: Maximilian Michels <>
Date:   2016-04-18T15:40:38Z

    [BEAM-196] abstraction for PipelineOptions serialization

commit 43b5ec743718e63c2d9d9532e3ca55bc87370290
Author: Maximilian Michels <>
Date:   2016-04-18T15:40:50Z

    [BEAM-196] make use of SerializedPipelineOptions


> Pipeline options must be available Context in DoFn.startBundle
> --------------------------------------------------------------
>                 Key: BEAM-196
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Mark Shields
>            Assignee: Maximilian Michels
> Our (not yet merged) Java Pubsub implementation has code like this in a DoFn:
>     @Override
>     public void startBundle(Context c) throws Exception {
>       Preconditions.checkState(pubsubClient == null);
>       pubsubClient = PubsubClient.newClient(transportType,
>           timestampLabel, idLabel, c.getPipelineOptions().as(PubsubOptions.class));
>       super.startBundle(c);
>     }
> This fails with NPE since the pipeline options are not conveyed via the context.

This message was sent by Atlassian JIRA

View raw message