beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-1149) Side input access fails in direct runner (possibly others too) when input element in multiple windows
Date Wed, 14 Dec 2016 19:35:58 GMT

    [ https://issues.apache.org/jira/browse/BEAM-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15749260#comment-15749260
] 

ASF GitHub Bot commented on BEAM-1149:
--------------------------------------

GitHub user kennknowles opened a pull request:

    https://github.com/apache/incubator-beam/pull/1616

    [BEAM-1149] SimpleDoFnRunner observes window if SideInputReader is nonempty

    Be sure to do all of the following to help us incorporate your contribution
    quickly and easily:
    
     - [x] Make sure the PR title is formatted like:
       `[BEAM-<Jira issue #>] Description of pull request`
     - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
           Travis-CI on your fork and ensure the whole test matrix passes).
     - [x] Replace `<Jira issue #>` in the title with the actual Jira issue
           number, if there is one.
     - [x] If this contribution is large, please file an Apache
           [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt).
    
    ---
    
    R: @amitsela 
    
    This is a quick fix. There is some redundancy with `PushbackSideInputDoFnRunner`.
    
    Confirmed that the following run - which is only run on postcommit - fails on `master`
and succeeds with this PR:
    
    ```
    mvn --batch-mode --errors verify -pl runners/spark \
        -Prunnable-on-service-tests -Plocal-runnable-on-service-tests \
        -D test=ParDoTest \
        -D mdep.analyze.skip=true \
        -D failIfNoTests=false \
        -D forkCount=0 \
        -D runnableOnServicePipelineOptions='[
          "--runner=TestSparkRunner",
          "--streaming=false",
          "--enableSparkMetricSinks=false"
        ]'
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kennknowles/incubator-beam observesWindow

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-beam/pull/1616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1616
    
----
commit 9fac4ac1abed954136bb4ed5b6e9c1471c2d3c3c
Author: Kenneth Knowles <klk@google.com>
Date:   2016-12-14T19:26:27Z

    SimpleDoFnRunner observes window if SideInputReader is nonempty

----


> Side input access fails in direct runner (possibly others too) when input element in
multiple windows
> -----------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-1149
>                 URL: https://issues.apache.org/jira/browse/BEAM-1149
>             Project: Beam
>          Issue Type: Bug
>            Reporter: Eugene Kirpichov
>            Assignee: Kenneth Knowles
>            Priority: Blocker
>             Fix For: 0.4.0-incubating
>
>
> {code:java}
>   private static class FnWithSideInputs extends DoFn<String, String> {
>     private final PCollectionView<Integer> view;
>     private FnWithSideInputs(PCollectionView<Integer> view) {
>       this.view = view;
>     }
>     @ProcessElement
>     public void processElement(ProcessContext c) {
>       c.output(c.element() + ":" + c.sideInput(view));
>     }
>   }
>   @Test
>   public void testSideInputsWithMultipleWindows() {
>     Pipeline p = TestPipeline.create();
>     MutableDateTime mutableNow = Instant.now().toMutableDateTime();
>     mutableNow.setMillisOfSecond(0);
>     Instant now = mutableNow.toInstant();
>     SlidingWindows windowFn =
>         SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
>     PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
>     PCollection<String> res =
>         p.apply(Create.timestamped(TimestampedValue.of("a", now)))
>             .apply(Window.<String>into(windowFn))
>             .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
>     PAssert.that(res).containsInAnyOrder("a:1");
>     p.run();
>   }
> {code}
> This fails with the following exception:
> {code}
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException:
sideInput called when main input element is in multiple windows
> 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343)
> 	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1)
> 	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
> 	at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112)
> 	at ....
> Caused by: java.lang.IllegalStateException: sideInput called when main input element
is in multiple windows
> 	at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514)
> 	at org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message