beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [01/12] incubator-beam git commit: [BEAM-151] Break out Dataflow runner dependency to separate test file
Date Thu, 07 Apr 2016 18:21:11 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 7f0a2f78a -> 6467afb74


[BEAM-151] Break out Dataflow runner dependency to separate test file

This allows for moving the Dataflow specific portion of the testing to the Dataflow runner
maven module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/05f7aaa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/05f7aaa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/05f7aaa4

Branch: refs/heads/master
Commit: 05f7aaa491e714d91f8a4740cbea48638f2d40ee
Parents: a18ab87
Author: Luke Cwik <lcwik@google.com>
Authored: Fri Mar 25 16:19:54 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu Apr 7 11:18:32 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/DataflowViewTest.java        | 205 +++++++++++++++++++
 .../cloud/dataflow/sdk/transforms/ViewTest.java | 126 ------------
 2 files changed, 205 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/05f7aaa4/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
new file mode 100644
index 0000000..b67bfcf
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowViewTest.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.transforms;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link View} for a {@link DataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class DataflowViewTest {
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  private Pipeline createTestBatchRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private Pipeline createTestStreamingRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setStreaming(true);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private void testViewUnbounded(
+      Pipeline pipeline,
+      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>>
view) {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to create a side-input view from input");
+    thrown.expectCause(
+        ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
+    pipeline
+        .apply(
+            new PTransform<PBegin, PCollection<KV<String, Integer>>>()
{
+              @Override
+              public PCollection<KV<String, Integer>> apply(PBegin input) {
+                return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal(
+                        input.getPipeline(),
+                        WindowingStrategy.globalDefault(),
+                        PCollection.IsBounded.UNBOUNDED)
+                    .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>()
{});
+              }
+            })
+        .apply(view);
+  }
+
+  private void testViewNonmerging(
+      Pipeline pipeline,
+      PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>>
view) {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Unable to create a side-input view from input");
+    thrown.expectCause(
+        ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
+    pipeline.apply(Create.<KV<String, Integer>>of(KV.of("hello", 5)))
+        .apply(Window.<KV<String, Integer>>into(new InvalidWindows<>(
+            "Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1)))))
+        .apply(view);
+  }
+
+  @Test
+  public void testViewUnboundedAsSingletonBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewUnboundedAsSingletonStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewUnboundedAsIterableBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewUnboundedAsIterableStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewUnboundedAsListBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewUnboundedAsListStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewUnboundedAsMapBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMapStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMultimapBatch() {
+    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewUnboundedAsMultimapStreaming() {
+    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewNonmergingAsSingletonBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewNonmergingAsSingletonStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
+  }
+
+  @Test
+  public void testViewNonmergingAsIterableBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewNonmergingAsIterableStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
+  }
+
+  @Test
+  public void testViewNonmergingAsListBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewNonmergingAsListStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
+  }
+
+  @Test
+  public void testViewNonmergingAsMapBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMapStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMultimapBatch() {
+    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
+  }
+
+  @Test
+  public void testViewNonmergingAsMultimapStreaming() {
+    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/05f7aaa4/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java
index 64ec176..69a4d2e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/ViewTest.java
@@ -33,10 +33,8 @@ import com.google.cloud.dataflow.sdk.coders.NullableCoder;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
@@ -45,7 +43,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PBegin;
@@ -1333,27 +1330,6 @@ public class ViewTest implements Serializable {
     assertEquals("View.AsMultimap", View.<String, Integer>asMultimap().getName());
   }
 
-  private Pipeline createTestBatchRunner() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
-    return Pipeline.create(options);
-  }
-
-  private Pipeline createTestStreamingRunner() {
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setRunner(DataflowPipelineRunner.class);
-    options.setStreaming(true);
-    options.setProject("someproject");
-    options.setStagingLocation("gs://staging");
-    options.setPathValidatorClass(NoopPathValidator.class);
-    options.setDataflowClient(null);
-    return Pipeline.create(options);
-  }
-
   private Pipeline createTestDirectRunner() {
     DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
     options.setRunner(DirectPipelineRunner.class);
@@ -1396,152 +1372,50 @@ public class ViewTest implements Serializable {
   }
 
   @Test
-  public void testViewUnboundedAsSingletonBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
-  public void testViewUnboundedAsSingletonStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
   public void testViewUnboundedAsSingletonDirect() {
     testViewUnbounded(createTestDirectRunner(), View.<KV<String, Integer>>asSingleton());
   }
 
   @Test
-  public void testViewUnboundedAsIterableBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
-  public void testViewUnboundedAsIterableStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
   public void testViewUnboundedAsIterableDirect() {
     testViewUnbounded(createTestDirectRunner(), View.<KV<String, Integer>>asIterable());
   }
 
   @Test
-  public void testViewUnboundedAsListBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
-  public void testViewUnboundedAsListStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
   public void testViewUnboundedAsListDirect() {
     testViewUnbounded(createTestDirectRunner(), View.<KV<String, Integer>>asList());
   }
 
   @Test
-  public void testViewUnboundedAsMapBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
-  public void testViewUnboundedAsMapStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
   public void testViewUnboundedAsMapDirect() {
     testViewUnbounded(createTestDirectRunner(), View.<String, Integer>asMap());
   }
 
-
-  @Test
-  public void testViewUnboundedAsMultimapBatch() {
-    testViewUnbounded(createTestBatchRunner(), View.<String, Integer>asMultimap());
-  }
-
-  @Test
-  public void testViewUnboundedAsMultimapStreaming() {
-    testViewUnbounded(createTestStreamingRunner(), View.<String, Integer>asMultimap());
-  }
-
   @Test
   public void testViewUnboundedAsMultimapDirect() {
     testViewUnbounded(createTestDirectRunner(), View.<String, Integer>asMultimap());
   }
 
   @Test
-  public void testViewNonmergingAsSingletonBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
-  public void testViewNonmergingAsSingletonStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asSingleton());
-  }
-
-  @Test
   public void testViewNonmergingAsSingletonDirect() {
     testViewNonmerging(createTestDirectRunner(), View.<KV<String, Integer>>asSingleton());
   }
 
   @Test
-  public void testViewNonmergingAsIterableBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
-  public void testViewNonmergingAsIterableStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asIterable());
-  }
-
-  @Test
   public void testViewNonmergingAsIterableDirect() {
     testViewNonmerging(createTestDirectRunner(), View.<KV<String, Integer>>asIterable());
   }
 
   @Test
-  public void testViewNonmergingAsListBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
-  public void testViewNonmergingAsListStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<KV<String, Integer>>asList());
-  }
-
-  @Test
   public void testViewNonmergingAsListDirect() {
     testViewNonmerging(createTestDirectRunner(), View.<KV<String, Integer>>asList());
   }
 
   @Test
-  public void testViewNonmergingAsMapBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
-  public void testViewNonmergingAsMapStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMap());
-  }
-
-  @Test
   public void testViewNonmergingAsMapDirect() {
     testViewNonmerging(createTestDirectRunner(), View.<String, Integer>asMap());
   }
 
-
-  @Test
-  public void testViewNonmergingAsMultimapBatch() {
-    testViewNonmerging(createTestBatchRunner(), View.<String, Integer>asMultimap());
-  }
-
-  @Test
-  public void testViewNonmergingAsMultimapStreaming() {
-    testViewNonmerging(createTestStreamingRunner(), View.<String, Integer>asMultimap());
-  }
-
   @Test
   public void testViewNonmergingAsMultimapDirect() {
     testViewNonmerging(createTestDirectRunner(), View.<String, Integer>asMultimap());


Mime
View raw message