beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [28/50] [abbrv] incubator-beam git commit: Handle PCollectionList.empty() in FlattenEvaluatorFactory
Date Fri, 26 Feb 2016 22:55:05 GMT
Handle PCollectionList.empty() in FlattenEvaluatorFactory

PCollectionList.empty() is a valid argument to a Flatten#pCollections
PTransform. It should succeed and produce no output.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115455733


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d15d924d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d15d924d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d15d924d

Branch: refs/heads/master
Commit: d15d924d5dae18a07067cc3a71ba3b50431fe3d7
Parents: f7fc939
Author: tgroh <tgroh@google.com>
Authored: Wed Feb 24 08:42:55 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 .../inprocess/FlattenEvaluatorFactory.java      |  8 ++++++-
 .../inprocess/FlattenEvaluatorFactoryTest.java  | 23 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d15d924d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
index d8b5312..1442888 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2015 Google Inc.
+ * Copyright (C) 2016 Google Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
@@ -46,6 +46,12 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
           application,
       final CommittedBundle<InputT> inputBundle,
       final InProcessEvaluationContext evaluationContext) {
+    if (inputBundle == null) {
+      // it is impossible to call processElement on a flatten with no input bundle. A Flatten
with
+      // no input bundle occurs as an output of Flatten.pcollections(PCollectionList.empty())
+      return new FlattenEvaluator<>(
+          null, StepTransformResult.withoutHold(application).build());
+    }
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(inputBundle, application.getOutput());
     final InProcessTransformResult result =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d15d924d/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
index c2b9995..dac42b6 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java
@@ -16,6 +16,7 @@
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -112,4 +113,26 @@ public class FlattenEvaluatorFactoryTest {
             WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)),
             WindowedValue.valueInGlobalWindow(-1)));
   }
+
+  @Test
+  public void testFlattenInMemoryEvaluatorWithEmptyPCollectionList() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    PCollectionList<Integer> list = PCollectionList.empty(p);
+
+    PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections());
+
+    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+
+    FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory();
+    TransformEvaluator<Integer> emptyEvaluator =
+        factory.forApplication(flattened.getProducingTransformInternal(), null, context);
+
+    InProcessTransformResult leftSideResult = emptyEvaluator.finishBundle();
+
+    assertThat(leftSideResult.getOutputBundles(), emptyIterable());
+    assertThat(
+        leftSideResult.getTransform(),
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal()));
+  }
+
 }


Mime
View raw message