beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ismaël Mejía (JIRA) <j...@apache.org>
Subject [jira] [Resolved] (BEAM-1948) Null pointer exception in DirectRunner.DirectPipelineResult.getAggregatorValues()
Date Fri, 21 Apr 2017 17:02:04 GMT

     [ https://issues.apache.org/jira/browse/BEAM-1948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ismaël Mejía resolved BEAM-1948.
--------------------------------
       Resolution: Fixed
    Fix Version/s: Not applicable

> Null pointer exception in DirectRunner.DirectPipelineResult.getAggregatorValues()
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-1948
>                 URL: https://issues.apache.org/jira/browse/BEAM-1948
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Etienne Chauchot
>            Assignee: Etienne Chauchot
>            Priority: Minor
>             Fix For: Not applicable
>
>
> null pointer exception is due to an {{Aggregator}} not being present in {{aggregatorSteps}}
(maybe because not present in the DAG).
> We can reproduce the null pointer exception with a simple pipeline with an {{Aggregator}}
and a {{State}} like this one:
> {code}
>     IdentityDoFn identityDoFn = new IdentityDoFn();
>     p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key",
"element3")))
>         .apply(ParDo.of(identityDoFn));
>     PipelineResult pipelineResult = p.run();
>     pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
>   private static class IdentityDoFn extends DoFn<KV<String, String>, String>
{
>     private final Aggregator<Long, Long> counter = createAggregator("counter",
Sum.ofLongs());
>     private static final String STATE_ID = "state";
>     @StateId(STATE_ID)
>     private static final StateSpec<Object, ValueState<String>> stateSpec
=
>         StateSpecs.value(StringUtf8Coder.of());
>     @ProcessElement
>     public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String>
state){
>       state.write("state content");
>       counter.addValue(1L);
>       context.output(context.element().getValue());
>     }
>     public Aggregator<Long, Long> getCounter() {
>       return counter;
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message