beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] beam git commit: [BEAM-1948] Defend against absent Aggregators
Date Thu, 20 Apr 2017 15:51:28 GMT
Repository: beam
Updated Branches:
  refs/heads/master 104f98235 -> 4e0c8333c


[BEAM-1948] Defend against absent Aggregators

Add protection against null pointer exception if aggregator key is not
present in aggregatorSteps


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8434acf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8434acf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8434acf

Branch: refs/heads/master
Commit: b8434acf6081c1e411e9818dc64d6329db6c729e
Parents: 104f982
Author: Etienne Chauchot <echauchot@gmail.com>
Authored: Tue Apr 18 15:38:05 2017 +0200
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu Apr 20 08:50:33 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 14 ++++----
 .../beam/runners/direct/DirectRunnerTest.java   | 37 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b8434acf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 43147a0..45a04a7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -383,12 +383,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
       AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
       Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
       final Map<String, T> stepValues = new HashMap<>();
-      for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
-        if (steps.contains(transform.getTransform())) {
-          T aggregate = aggregators.getAggregate(
-              evaluationContext.getStepName(transform), aggregator.getName());
-          if (aggregate != null) {
-            stepValues.put(transform.getFullName(), aggregate);
+      if (steps != null) {
+        for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
+          if (steps.contains(transform.getTransform())) {
+            T aggregate = aggregators
+                .getAggregate(evaluationContext.getStepName(transform), aggregator.getName());
+            if (aggregate != null) {
+              stepValues.put(transform.getFullName(), aggregate);
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b8434acf/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index ed19be2..246c111 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
@@ -47,6 +48,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -58,6 +60,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -66,9 +69,13 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -525,6 +532,36 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
+
+  @Test
+  public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
+    Pipeline p = getPipeline();
+    IdentityDoFn identityDoFn = new IdentityDoFn();
+    p.apply(Create.of(KV.of("key", "element1"), KV.of("key", "element2"), KV.of("key", "element3")))
+        .apply(ParDo.of(identityDoFn));
+    PipelineResult pipelineResult = p.run();
+    pipelineResult.getAggregatorValues(identityDoFn.getCounter()).getValues();
+  }
+
+  private static class IdentityDoFn extends DoFn<KV<String, String>, String>
{
+    private final Aggregator<Long, Long> counter = createAggregator("counter", Sum.ofLongs());
+    private static final String STATE_ID = "state";
+    @StateId(STATE_ID)
+    private static final StateSpec<Object, ValueState<String>> stateSpec =
+        StateSpecs.value(StringUtf8Coder.of());
+
+    @ProcessElement
+    public void processElement(ProcessContext context, @StateId(STATE_ID) ValueState<String>
state){
+      state.write("state content");
+      counter.addValue(1L);
+      context.output(context.element().getValue());
+    }
+
+    public Aggregator<Long, Long> getCounter() {
+      return counter;
+    }
+  }
+
   private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(


Mime
View raw message