beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [07/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner
Date Wed, 27 Apr 2016 01:08:39 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
new file mode 100644
index 0000000..3a39e41
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -0,0 +1,967 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.apache.beam.sdk.util.Structs.addObject;
+import static org.apache.beam.sdk.util.Structs.getDictionary;
+import static org.apache.beam.sdk.util.Structs.getString;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.RecordingPipelineVisitor;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.OutputReference;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.DataflowPackage;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.Step;
+import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for DataflowPipelineTranslator.
+ */
+@RunWith(JUnit4.class)
+public class DataflowPipelineTranslatorTest implements Serializable {
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  // A Custom Mockito matcher for an initial Job that checks that all
+  // expected fields are set.
+  private static class IsValidCreateRequest extends ArgumentMatcher<Job> {
+    @Override
+    public boolean matches(Object o) {
+      Job job = (Job) o;
+      return job.getId() == null
+          && job.getProjectId() == null
+          && job.getName() != null
+          && job.getType() != null
+          && job.getEnvironment() != null
+          && job.getSteps() != null
+          && job.getCurrentState() == null
+          && job.getCurrentStateTime() == null
+          && job.getExecutionInfo() == null
+          && job.getCreateTime() == null;
+    }
+  }
+
+  private Pipeline buildPipeline(DataflowPipelineOptions options) {
+    options.setRunner(DataflowPipelineRunner.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
+        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+
+    return p;
+  }
+
+  private static Dataflow buildMockDataflow(
+      ArgumentMatcher<Job> jobMatcher) throws IOException {
+    Dataflow mockDataflowClient = mock(Dataflow.class);
+    Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
+    Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class);
+    Dataflow.Projects.Jobs.Create mockRequest = mock(
+        Dataflow.Projects.Jobs.Create.class);
+
+    when(mockDataflowClient.projects()).thenReturn(mockProjects);
+    when(mockProjects.jobs()).thenReturn(mockJobs);
+    when(mockJobs.create(eq("someProject"), argThat(jobMatcher)))
+        .thenReturn(mockRequest);
+
+    Job resultJob = new Job();
+    resultJob.setId("newid");
+    when(mockRequest.execute()).thenReturn(resultJob);
+    return mockDataflowClient;
+  }
+
+  private static DataflowPipelineOptions buildPipelineOptions() throws IOException {
+    GcsUtil mockGcsUtil = mock(GcsUtil.class);
+    when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer<List<GcsPath>>() {
+      @Override
+      public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+        return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+      }
+    });
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setGcpCredential(new TestCredential());
+    options.setJobName("some-job-name");
+    options.setProject("some-project");
+    options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
+    options.setFilesToStage(new LinkedList<String>());
+    options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
+    options.setGcsUtil(mockGcsUtil);
+    return options;
+  }
+
+  @Test
+  public void testSettingOfSdkPipelineOptions() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setRunner(DataflowPipelineRunner.class);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    // Note that the contents of this materialized map may be changed by the act of reading an
+    // option, which will cause the default to get materialized whereas it would otherwise be
+    // left absent. It is permissible to simply alter this test to reflect current behavior.
+    Map<String, Object> settings = new HashMap<>();
+    settings.put("appName", "DataflowPipelineTranslatorTest");
+    settings.put("project", "some-project");
+    settings.put("pathValidatorClass",
+        "org.apache.beam.runners.dataflow.util.DataflowPathValidator");
+    settings.put("runner", "org.apache.beam.runners.dataflow.DataflowPipelineRunner");
+    settings.put("jobName", "some-job-name");
+    settings.put("tempLocation", "gs://somebucket/some/path");
+    settings.put("stagingLocation", "gs://somebucket/some/path/staging");
+    settings.put("stableUniqueNames", "WARNING");
+    settings.put("streaming", false);
+    settings.put("numberOfWorkerHarnessThreads", 0);
+    settings.put("experiments", null);
+
+    Map<String, Object> sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions();
+    assertThat(sdkPipelineOptions, hasKey("options"));
+    assertEquals(settings, sdkPipelineOptions.get("options"));
+  }
+
+  @Test
+  public void testNetworkConfig() throws IOException {
+    final String testNetwork = "test-network";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setNetwork(testNetwork);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testNetwork,
+        job.getEnvironment().getWorkerPools().get(0).getNetwork());
+  }
+
+  @Test
+  public void testNetworkConfigMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
+  }
+
+  @Test
+  public void testSubnetworkConfig() throws IOException {
+    final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setSubnetwork(testSubnetwork);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testSubnetwork,
+        job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+  }
+
+  @Test
+  public void testSubnetworkConfigMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+  }
+
+  @Test
+  public void testScalingAlgorithmMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    // Autoscaling settings are always set.
+    assertNull(
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getAlgorithm());
+    assertEquals(
+        0,
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getMaxNumWorkers()
+            .intValue());
+  }
+
+  @Test
+  public void testScalingAlgorithmNone() throws IOException {
+    final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling =
+        DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE;
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setAutoscalingAlgorithm(noScaling);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(
+        "AUTOSCALING_ALGORITHM_NONE",
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getAlgorithm());
+    assertEquals(
+        0,
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getMaxNumWorkers()
+            .intValue());
+  }
+
+  @Test
+  public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException {
+    final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = null;
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setMaxNumWorkers(42);
+    options.setAutoscalingAlgorithm(noScaling);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getAlgorithm());
+    assertEquals(
+        42,
+        job
+            .getEnvironment()
+            .getWorkerPools()
+            .get(0)
+            .getAutoscalingSettings()
+            .getMaxNumWorkers()
+            .intValue());
+  }
+
+  @Test
+  public void testZoneConfig() throws IOException {
+    final String testZone = "test-zone-1";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setZone(testZone);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testZone,
+        job.getEnvironment().getWorkerPools().get(0).getZone());
+  }
+
+  @Test
+  public void testWorkerMachineTypeConfig() throws IOException {
+    final String testMachineType = "test-machine-type";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setWorkerMachineType(testMachineType);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+
+    WorkerPool workerPool = job.getEnvironment().getWorkerPools().get(0);
+    assertEquals(testMachineType, workerPool.getMachineType());
+  }
+
+  @Test
+  public void testDiskSizeGbConfig() throws IOException {
+    final Integer diskSizeGb = 1234;
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setDiskSizeGb(diskSizeGb);
+
+    Pipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(
+                p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(diskSizeGb,
+        job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
+  }
+
+  @Test
+  public void testPredefinedAddStep() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    DataflowPipelineTranslator.registerTransformTranslator(
+        EmbeddedTransform.class, new EmbeddedTranslator());
+
+    // Create a predefined step using another pipeline
+    Step predefinedStep = createPredefinedStep();
+
+    // Create a pipeline that the predefined step will be embedded into
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+        .apply(ParDo.of(new NoOpFn()))
+        .apply(new EmbeddedTransform(predefinedStep.clone()))
+        .apply(ParDo.of(new NoOpFn()));
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(4, steps.size());
+
+    // The input to the embedded step should match the output of the step before
+    Map<String, Object> step1Out = getOutputPortReference(steps.get(1));
+    Map<String, Object> step2In = getDictionary(
+        steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT);
+    assertEquals(step1Out, step2In);
+
+    // The output from the embedded step should match the input of the step after
+    Map<String, Object> step2Out = getOutputPortReference(steps.get(2));
+    Map<String, Object> step3In = getDictionary(
+        steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT);
+    assertEquals(step2Out, step3In);
+
+    // The step should not have been modified other than remapping the input
+    Step predefinedStepClone = predefinedStep.clone();
+    Step embeddedStepClone = steps.get(2).clone();
+    predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
+    embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT);
+    assertEquals(predefinedStepClone, embeddedStepClone);
+  }
+
+  /**
+   * Construct a OutputReference for the output of the step.
+   */
+  private static OutputReference getOutputPortReference(Step step) throws Exception {
+    // TODO: This should be done via a Structs accessor.
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> output =
+        (List<Map<String, Object>>) step.getProperties().get(PropertyNames.OUTPUT_INFO);
+    String outputTagId = getString(Iterables.getOnlyElement(output), PropertyNames.OUTPUT_NAME);
+    return new OutputReference(step.getName(), outputTagId);
+  }
+
+  /**
+   * Returns a Step for a DoFn by creating and translating a pipeline.
+   */
+  private static Step createPredefinedStep() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    Pipeline pipeline = Pipeline.create(options);
+    String stepName = "DoFn1";
+    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+        .apply(ParDo.of(new NoOpFn()).named(stepName))
+        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out"));
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(13, job.getSteps().size());
+    Step step = job.getSteps().get(1);
+    assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
+    return step;
+  }
+
+  private static class NoOpFn extends DoFn<String, String> {
+    @Override public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element());
+    }
+  }
+
+  /**
+   * A placeholder transform that will be used to substitute a predefined Step.
+   */
+  private static class EmbeddedTransform
+      extends PTransform<PCollection<String>, PCollection<String>> {
+    private final Step step;
+
+    public EmbeddedTransform(Step step) {
+      this.step = step;
+    }
+
+    @Override
+    public PCollection<String> apply(PCollection<String> input) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          input.isBounded());
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
+  }
+
+  /**
+   * A TransformTranslator that adds the predefined Step using
+   * {@link TranslationContext#addStep} and remaps the input port reference.
+   */
+  private static class EmbeddedTranslator
+      implements DataflowPipelineTranslator.TransformTranslator<EmbeddedTransform> {
+    @Override public void translate(EmbeddedTransform transform, TranslationContext context) {
+      addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT,
+          context.asOutputReference(context.getInput(transform)));
+      context.addStep(transform, transform.step);
+    }
+  }
+
+  /**
+   * A composite transform that returns an output that is unrelated to
+   * the input.
+   */
+  private static class UnrelatedOutputCreator
+      extends PTransform<PCollection<Integer>, PCollection<Integer>> {
+
+    @Override
+    public PCollection<Integer> apply(PCollection<Integer> input) {
+      // Apply an operation so that this is a composite transform.
+      input.apply(Count.<Integer>perElement());
+
+      // Return a value unrelated to the input.
+      return input.getPipeline().apply(Create.of(1, 2, 3, 4));
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return VarIntCoder.of();
+    }
+  }
+
+  /**
+   * A composite transform that returns an output that is unbound.
+   */
+  private static class UnboundOutputCreator
+      extends PTransform<PCollection<Integer>, PDone> {
+
+    @Override
+    public PDone apply(PCollection<Integer> input) {
+      // Apply an operation so that this is a composite transform.
+      input.apply(Count.<Integer>perElement());
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    protected Coder<?> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
+  }
+
+  /**
+   * A composite transform that returns a partially bound output.
+   *
+   * <p>This is not allowed and will result in a failure.
+   */
+  private static class PartiallyBoundOutputCreator
+      extends PTransform<PCollection<Integer>, PCollectionTuple> {
+
+    public final TupleTag<Integer> sumTag = new TupleTag<>("sum");
+    public final TupleTag<Void> doneTag = new TupleTag<>("done");
+
+    @Override
+    public PCollectionTuple apply(PCollection<Integer> input) {
+      PCollection<Integer> sum = input.apply(Sum.integersGlobally());
+
+      // Fails here when attempting to construct a tuple with an unbound object.
+      return PCollectionTuple.of(sumTag, sum)
+          .and(doneTag, PCollection.<Void>createPrimitiveOutputInternal(
+              input.getPipeline(),
+              WindowingStrategy.globalDefault(),
+              input.isBounded()));
+    }
+  }
+
+  @Test
+  public void testMultiGraphPipelineSerialization() throws IOException {
+    Pipeline p = Pipeline.create(buildPipelineOptions());
+
+    PCollection<Integer> input = p.begin()
+        .apply(Create.of(1, 2, 3));
+
+    input.apply(new UnrelatedOutputCreator());
+    input.apply(new UnboundOutputCreator());
+
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(
+        PipelineOptionsFactory.as(DataflowPipelineOptions.class));
+
+    // Check that translation doesn't fail.
+    t.translate(
+        p, (DataflowPipelineRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
+  }
+
+  @Test
+  public void testPartiallyBoundFailure() throws IOException {
+    Pipeline p = Pipeline.create(buildPipelineOptions());
+
+    PCollection<Integer> input = p.begin()
+        .apply(Create.of(1, 2, 3));
+
+    thrown.expect(IllegalStateException.class);
+    input.apply(new PartiallyBoundOutputCreator());
+
+    Assert.fail("Failure expected from use of partially bound output");
+  }
+
+  /**
+   * This tests a few corner cases that should not crash.
+   */
+  @Test
+  public void testGoodWildcards() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    Pipeline pipeline = Pipeline.create(options);
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+    applyRead(pipeline, "gs://bucket/foo");
+    applyRead(pipeline, "gs://bucket/foo/");
+    applyRead(pipeline, "gs://bucket/foo/*");
+    applyRead(pipeline, "gs://bucket/foo/?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]");
+    applyRead(pipeline, "gs://bucket/foo/*baz*");
+    applyRead(pipeline, "gs://bucket/foo/*baz?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+    applyRead(pipeline, "gs://bucket/foo/baz/*");
+    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+    applyRead(pipeline, "gs://bucket/foo*/baz");
+    applyRead(pipeline, "gs://bucket/foo?/baz");
+    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+    // Check that translation doesn't fail.
+    t.translate(
+        pipeline,
+        (DataflowPipelineRunner) pipeline.getRunner(),
+        Collections.<DataflowPackage>emptyList());
+  }
+
+  private void applyRead(Pipeline pipeline, String path) {
+    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+  }
+
+  /**
+   * Recursive wildcards are not supported.
+   * This tests "**".
+   */
+  @Test
+  public void testBadWildcardRecursive() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    Pipeline pipeline = Pipeline.create(options);
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+    pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+
+    // Check that translation does fail.
+    thrown.expectCause(Matchers.allOf(
+        instanceOf(IllegalArgumentException.class),
+        ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage"))));
+    t.translate(
+        pipeline,
+        (DataflowPipelineRunner) pipeline.getRunner(),
+        Collections.<DataflowPackage>emptyList());
+  }
+
+  @Test
+  public void testToSingletonTranslation() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<T> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1))
+        .apply(View.<Integer>asSingleton());
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(2, steps.size());
+
+    Step createStep = steps.get(0);
+    assertEquals("ParallelRead", createStep.getKind());
+
+    Step collectionToSingletonStep = steps.get(1);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+
+  }
+
+  @Test
+  public void testToIterableTranslation() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<Iterable<T>> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setExperiments(ImmutableList.of("disable_ism_side_input"));
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1, 2, 3))
+        .apply(View.<Integer>asIterable());
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(2, steps.size());
+
+    Step createStep = steps.get(0);
+    assertEquals("ParallelRead", createStep.getKind());
+
+    Step collectionToSingletonStep = steps.get(1);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
+  public void testToSingletonTranslationWithIsmSideInput() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<T> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1))
+        .apply(View.<Integer>asSingleton());
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(5, steps.size());
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> toIsmRecordOutputs =
+        (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO);
+    assertTrue(
+        Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
+
+    Step collectionToSingletonStep = steps.get(4);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
+  public void testToIterableTranslationWithIsmSideInput() throws Exception {
+    // A "change detector" test that makes sure the translation
+    // of getting a PCollectionView<Iterable<T>> does not change
+    // in bad ways during refactor
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(Create.of(1, 2, 3))
+        .apply(View.<Integer>asIterable());
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(3, steps.size());
+
+    @SuppressWarnings("unchecked")
+    List<Map<String, Object>> toIsmRecordOutputs =
+        (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO);
+    assertTrue(
+        Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format"));
+
+
+    Step collectionToSingletonStep = steps.get(2);
+    assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
+  }
+
+  @Test
+  public void testStepDisplayData() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
+    Pipeline pipeline = Pipeline.create(options);
+
+    DoFn<Integer, Integer> fn1 = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element());
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder
+            .add("foo", "bar")
+            .add("foo2", DataflowPipelineTranslatorTest.class)
+                .withLabel("Test Class")
+                .withLinkUrl("http://www.google.com");
+      }
+    };
+
+    DoFn<Integer, Integer> fn2 = new DoFn<Integer, Integer>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element());
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        builder.add("foo3", 1234);
+      }
+    };
+
+    ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1);
+    ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2);
+    pipeline
+      .apply(Create.of(1, 2, 3))
+      .apply(parDo1)
+      .apply(parDo2);
+
+    Job job =
+        translator
+            .translate(
+                pipeline,
+                (DataflowPipelineRunner) pipeline.getRunner(),
+                Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    List<Step> steps = job.getSteps();
+    assertEquals(3, steps.size());
+
+    Map<String, Object> parDo1Properties = steps.get(1).getProperties();
+    Map<String, Object> parDo2Properties = steps.get(2).getProperties();
+    assertThat(parDo1Properties, hasKey("display_data"));
+
+    Collection<Map<String, String>> fn1displayData =
+            (Collection<Map<String, String>>) parDo1Properties.get("display_data");
+    Collection<Map<String, String>> fn2displayData =
+            (Collection<Map<String, String>>) parDo2Properties.get("display_data");
+
+    ImmutableSet<ImmutableMap<String, Object>> expectedFn1DisplayData = ImmutableSet.of(
+        ImmutableMap.<String, Object>builder()
+            .put("key", "foo")
+            .put("type", "STRING")
+            .put("value", "bar")
+            .put("namespace", fn1.getClass().getName())
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("key", "fn")
+            .put("type", "JAVA_CLASS")
+            .put("value", fn1.getClass().getName())
+            .put("shortValue", fn1.getClass().getSimpleName())
+            .put("namespace", parDo1.getClass().getName())
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("key", "foo2")
+            .put("type", "JAVA_CLASS")
+            .put("value", DataflowPipelineTranslatorTest.class.getName())
+            .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName())
+            .put("namespace", fn1.getClass().getName())
+            .put("label", "Test Class")
+            .put("linkUrl", "http://www.google.com")
+            .build()
+    );
+
+    ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of(
+        ImmutableMap.<String, Object>builder()
+            .put("key", "fn")
+            .put("type", "JAVA_CLASS")
+            .put("value", fn2.getClass().getName())
+            .put("shortValue", fn2.getClass().getSimpleName())
+            .put("namespace", parDo2.getClass().getName())
+            .build(),
+        ImmutableMap.<String, Object>builder()
+            .put("key", "foo3")
+            .put("type", "INTEGER")
+            .put("value", 1234L)
+            .put("namespace", fn2.getClass().getName())
+            .build()
+    );
+
+    assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
+    assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java
new file mode 100644
index 0000000..ed86be2
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Sample;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link CustomSources}.
+ */
+@RunWith(JUnit4.class)
+public class CustomSourcesTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Rule public ExpectedLogs logged = ExpectedLogs.none(CustomSources.class);
+
+  static class TestIO {
+    public static Read fromRange(int from, int to) {
+      return new Read(from, to, false);
+    }
+
+    static class Read extends BoundedSource<Integer> {
+      final int from;
+      final int to;
+      final boolean produceTimestamps;
+
+      Read(int from, int to, boolean produceTimestamps) {
+        this.from = from;
+        this.to = to;
+        this.produceTimestamps = produceTimestamps;
+      }
+
+      public Read withTimestampsMillis() {
+        return new Read(from, to, true);
+      }
+
+      @Override
+      public List<Read> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
+          throws Exception {
+        List<Read> res = new ArrayList<>();
+        DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+        float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers();
+        for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) {
+          res.add(new Read(
+              Math.round(from + i * step), Math.round(from + (i + 1) * step),
+              produceTimestamps));
+        }
+        return res;
+      }
+
+      @Override
+      public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+        return 8 * (to - from);
+      }
+
+      @Override
+      public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+        return true;
+      }
+
+      @Override
+      public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
+        return new RangeReader(this);
+      }
+
+      @Override
+      public void validate() {}
+
+      @Override
+      public String toString() {
+        return "[" + from + ", " + to + ")";
+      }
+
+      @Override
+      public Coder<Integer> getDefaultOutputCoder() {
+        return BigEndianIntegerCoder.of();
+      }
+
+      private static class RangeReader extends BoundedReader<Integer> {
+        // To verify that BasicSerializableSourceFormat calls our methods according to protocol.
+        enum State {
+          UNSTARTED,
+          STARTED,
+          FINISHED
+        }
+        private Read source;
+        private int current = -1;
+        private State state = State.UNSTARTED;
+
+        public RangeReader(Read source) {
+          this.source = source;
+        }
+
+        @Override
+        public boolean start() throws IOException {
+          Preconditions.checkState(state == State.UNSTARTED);
+          state = State.STARTED;
+          current = source.from;
+          return (current < source.to);
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+          Preconditions.checkState(state == State.STARTED);
+          if (current == source.to - 1) {
+            state = State.FINISHED;
+            return false;
+          }
+          current++;
+          return true;
+        }
+
+        @Override
+        public Integer getCurrent() {
+          Preconditions.checkState(state == State.STARTED);
+          return current;
+        }
+
+        @Override
+        public Instant getCurrentTimestamp() {
+          return source.produceTimestamps
+              ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+
+        @Override
+        public void close() throws IOException {
+          Preconditions.checkState(state == State.STARTED || state == State.FINISHED);
+          state = State.FINISHED;
+        }
+
+        @Override
+        public Read getCurrentSource() {
+          return source;
+        }
+
+        @Override
+        public Read splitAtFraction(double fraction) {
+          int proposedIndex = (int) (source.from + fraction * (source.to - source.from));
+          if (proposedIndex <= current) {
+            return null;
+          }
+          Read primary = new Read(source.from, proposedIndex, source.produceTimestamps);
+          Read residual = new Read(proposedIndex, source.to, source.produceTimestamps);
+          this.source = primary;
+          return residual;
+        }
+
+        @Override
+        public Double getFractionConsumed() {
+          return (current == -1)
+              ? 0.0
+              : (1.0 * (1 + current - source.from) / (source.to - source.from));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testDirectPipelineWithoutTimestamps() throws Exception {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> sum = p
+        .apply(Read.from(TestIO.fromRange(10, 20)))
+        .apply(Sum.integersGlobally())
+        .apply(Sample.<Integer>any(1));
+
+    PAssert.thatSingleton(sum).isEqualTo(145);
+    p.run();
+  }
+
+  @Test
+  public void testDirectPipelineWithTimestamps() throws Exception {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> sums =
+        p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis()))
+         .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(3))))
+         .apply(Sum.integersGlobally().withoutDefaults());
+    // Should group into [10 11] [12 13 14] [15 16 17] [18 19].
+    PAssert.that(sums).containsInAnyOrder(21, 37, 39, 48);
+    p.run();
+  }
+
+  @Test
+  public void testRangeProgressAndSplitAtFraction() throws Exception {
+    // Show basic usage of getFractionConsumed and splitAtFraction.
+    // This test only tests TestIO itself, not BasicSerializableSourceFormat.
+
+    DataflowPipelineOptions options =
+        PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
+    TestIO.Read source = TestIO.fromRange(10, 20);
+    try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
+      assertEquals(0, reader.getFractionConsumed().intValue());
+      assertTrue(reader.start());
+      assertEquals(0.1, reader.getFractionConsumed(), 1e-6);
+      assertTrue(reader.advance());
+      assertEquals(0.2, reader.getFractionConsumed(), 1e-6);
+      // Already past 0.0 and 0.1.
+      assertNull(reader.splitAtFraction(0.0));
+      assertNull(reader.splitAtFraction(0.1));
+
+      {
+        TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5);
+        assertNotNull(residual);
+        TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
+        assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14));
+        assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19));
+      }
+
+      // Range is now [10, 15) and we are at 12.
+      {
+        TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14.
+        assertNotNull(residual);
+        TestIO.Read primary = (TestIO.Read) reader.getCurrentSource();
+        assertThat(readFromSource(primary, options), contains(10, 11, 12, 13));
+        assertThat(readFromSource(residual, options), contains(14));
+      }
+
+      assertTrue(reader.advance());
+      assertEquals(12, reader.getCurrent().intValue());
+      assertTrue(reader.advance());
+      assertEquals(13, reader.getCurrent().intValue());
+      assertFalse(reader.advance());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
new file mode 100644
index 0000000..bfc99e8
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.io;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+
+/**
+ * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms.
+ */
+@RunWith(JUnit4.class)
+public class DataflowTextIOTest {
+
+  private TestDataflowPipelineOptions buildTestPipelineOptions() {
+    TestDataflowPipelineOptions options =
+        PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setGcpCredential(new TestCredential());
+    return options;
+  }
+
+  private GcsUtil buildMockGcsUtil() throws IOException {
+    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+    // Any request to open gets a new bogus channel
+    Mockito
+        .when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+        .then(new Answer<SeekableByteChannel>() {
+          @Override
+          public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable {
+            return FileChannel.open(
+                Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+
+    // Any request for expansion returns a list containing the original GcsPath
+    // This is required to pass validation that occurs in TextIO during apply()
+    Mockito
+        .when(mockGcsUtil.expand(Mockito.any(GcsPath.class)))
+        .then(new Answer<List<GcsPath>>() {
+          @Override
+          public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable {
+            return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+          }
+        });
+
+    return mockGcsUtil;
+  }
+
+  /**
+   * This tests a few corner cases that should not crash.
+   */
+  @Test
+  public void testGoodWildcards() throws Exception {
+    TestDataflowPipelineOptions options = buildTestPipelineOptions();
+    options.setGcsUtil(buildMockGcsUtil());
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    applyRead(pipeline, "gs://bucket/foo");
+    applyRead(pipeline, "gs://bucket/foo/");
+    applyRead(pipeline, "gs://bucket/foo/*");
+    applyRead(pipeline, "gs://bucket/foo/?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]");
+    applyRead(pipeline, "gs://bucket/foo/*baz*");
+    applyRead(pipeline, "gs://bucket/foo/*baz?");
+    applyRead(pipeline, "gs://bucket/foo/[0-9]baz?");
+    applyRead(pipeline, "gs://bucket/foo/baz/*");
+    applyRead(pipeline, "gs://bucket/foo/baz/*wonka*");
+    applyRead(pipeline, "gs://bucket/foo/*baz/wonka*");
+    applyRead(pipeline, "gs://bucket/foo*/baz");
+    applyRead(pipeline, "gs://bucket/foo?/baz");
+    applyRead(pipeline, "gs://bucket/foo[0-9]/baz");
+
+    // Check that running doesn't fail.
+    pipeline.run();
+  }
+
+  private void applyRead(Pipeline pipeline, String path) {
+    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java
new file mode 100644
index 0000000..71b6b57
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataflowPipelineDebugOptions}. */
+@RunWith(JUnit4.class)
+public class DataflowPipelineDebugOptionsTest {
+  @Test
+  public void testTransformNameMapping() throws Exception {
+    DataflowPipelineDebugOptions options = PipelineOptionsFactory
+        .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"})
+        .as(DataflowPipelineDebugOptions.class);
+    assertEquals(3, options.getTransformNameMapping().size());
+    assertThat(options.getTransformNameMapping(), hasEntry("a", "b"));
+    assertThat(options.getTransformNameMapping(), hasEntry("foo", ""));
+    assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
new file mode 100644
index 0000000..e7db40f
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ResetDateTimeProvider;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataflowPipelineOptions}. */
+@RunWith(JUnit4.class)
+public class DataflowPipelineOptionsTest {
+  @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+  @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider();
+
+  @Test
+  public void testJobNameIsSet() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setJobName("TestJobName");
+    assertEquals("TestJobName", options.getJobName());
+  }
+
+  @Test
+  public void testUserNameIsNotSet() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().remove("user.name");
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("TestApplication");
+    assertEquals("testapplication--1208190706", options.getJobName());
+    assertTrue(options.getJobName().length() <= 40);
+  }
+
+  @Test
+  public void testAppNameAndUserNameAreLong() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("1234567890123456789012345678901234567890");
+    assertEquals(
+        "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706",
+        options.getJobName());
+  }
+
+  @Test
+  public void testAppNameIsLong() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "abcde");
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("1234567890123456789012345678901234567890");
+    assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName());
+  }
+
+  @Test
+  public void testUserNameIsLong() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde");
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("1234567890");
+    assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName());
+  }
+
+  @Test
+  public void testUtf8UserNameAndApplicationNameIsNormalized() {
+    resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z");
+    System.getProperties().put("user.name", "ði ıntəˈnæʃənəl ");
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn");
+    assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
new file mode 100644
index 0000000..58f8514
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DataflowProfilingOptions}.
+ */
+@RunWith(JUnit4.class)
+public class DataflowProfilingOptionsTest {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Test
+  public void testOptionsObject() throws Exception {
+    DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {
+        "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"})
+        .as(DataflowPipelineOptions.class);
+    assertTrue(options.getEnableProfilingAgent());
+
+    String json = MAPPER.writeValueAsString(options);
+    assertThat(json, Matchers.containsString(
+        "\"profilingAgentConfiguration\":{\"interval\":21}"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
new file mode 100644
index 0000000..5b12ad5
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.options;
+
+import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.WARN;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link DataflowWorkerLoggingOptions}. */
+@RunWith(JUnit4.class)
+public class DataflowWorkerLoggingOptionsTest {
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testWorkerLogLevelOverrideWithInvalidLogLevel() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Unsupported log level");
+    WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel"));
+  }
+
+  @Test
+  public void testWorkerLogLevelOverrideForClass() throws Exception {
+    assertEquals("{\"org.junit.Test\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN)));
+  }
+
+  @Test
+  public void testWorkerLogLevelOverrideForPackage() throws Exception {
+    assertEquals("{\"org.junit\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN)));
+  }
+
+  @Test
+  public void testWorkerLogLevelOverrideForName() throws Exception {
+    assertEquals("{\"A\":\"WARN\"}",
+        MAPPER.writeValueAsString(
+            new WorkerLogLevelOverrides().addOverrideForName("A", WARN)));
+  }
+
+  @Test
+  public void testSerializationAndDeserializationOf() throws Exception {
+    String testValue = "{\"A\":\"WARN\"}";
+    assertEquals(testValue,
+        MAPPER.writeValueAsString(
+            MAPPER.readValue(testValue, WorkerLogLevelOverrides.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
new file mode 100644
index 0000000..d6de501
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.testing;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil;
+import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult.State;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.TimeUtil;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.Json;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.http.MockLowLevelHttpResponse;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.JobMessage;
+import com.google.api.services.dataflow.model.JobMetrics;
+import com.google.api.services.dataflow.model.MetricStructuredName;
+import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+/** Tests for {@link TestDataflowPipelineRunner}. */
+@RunWith(JUnit4.class)
+public class TestDataflowPipelineRunnerTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+  @Mock private MockHttpTransport transport;
+  @Mock private MockLowLevelHttpRequest request;
+  @Mock private GcsUtil mockGcsUtil;
+
+  private TestDataflowPipelineOptions options;
+  private Dataflow service;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(transport.buildRequest(anyString(), anyString())).thenReturn(request);
+    doCallRealMethod().when(request).getContentAsString();
+    service = new Dataflow(transport, Transport.getJsonFactory(), null);
+
+    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
+    options.setAppName("TestAppName");
+    options.setProject("test-project");
+    options.setTempLocation("gs://test/temp/location");
+    options.setTempRoot("gs://test");
+    options.setGcpCredential(new TestCredential());
+    options.setDataflowClient(service);
+    options.setRunner(TestDataflowPipelineRunner.class);
+    options.setPathValidatorClass(NoopPathValidator.class);
+  }
+
+  @Test
+  public void testToString() {
+    assertEquals("TestDataflowPipelineRunner#TestAppName",
+        new TestDataflowPipelineRunner(options).toString());
+  }
+
+  @Test
+  public void testRunBatchJobThatSucceeds() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.DONE);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    assertEquals(mockJob, runner.run(p, mockRunner));
+  }
+
+  @Test
+  public void testRunBatchJobThatFails() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.FAILED);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testBatchPipelineFailsIfException() throws Exception {
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testRunStreamingJobThatSucceeds() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    runner.run(p, mockRunner);
+  }
+
+  @Test
+  public void testRunStreamingJobThatFails() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+
+  @Test
+  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, true /* tentative */));
+    doReturn(State.DONE).when(job).getState();
+    assertEquals(Optional.of(true), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    doReturn(State.DONE).when(job).getState();
+    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, false /* tentative */));
+    doReturn(State.RUNNING).when(job).getState();
+    assertEquals(Optional.absent(), runner.checkForSuccess(job));
+  }
+
+  private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative)
+      throws Exception {
+    MetricStructuredName name = new MetricStructuredName();
+    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
+    name.setContext(
+        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
+
+    MetricUpdate metric = new MetricUpdate();
+    metric.setName(name);
+    metric.setScalar(BigDecimal.ONE);
+
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    JobMetrics jobMetrics = new JobMetrics();
+    jobMetrics.setMetrics(Lists.newArrayList(metric));
+    // N.B. Setting the factory is necessary in order to get valid JSON.
+    jobMetrics.setFactory(Transport.getJsonFactory());
+    response.setContent(jobMetrics.toPrettyString());
+    return response;
+  }
+
+  @Test
+  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
+    DataflowPipelineJob job =
+        spy(new DataflowPipelineJob("test-project", "test-job", service, null));
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(true /* success */, false /* tentative */));
+    doReturn(State.FAILED).when(job).getState();
+    assertEquals(Optional.of(false), runner.checkForSuccess(job));
+  }
+
+  @Test
+  public void testStreamingPipelineFailsIfException() throws Exception {
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
+    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
+
+    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
+    when(mockJob.getDataflowClient()).thenReturn(service);
+    when(mockJob.getState()).thenReturn(State.RUNNING);
+    when(mockJob.getProjectId()).thenReturn("test-project");
+    when(mockJob.getJobId()).thenReturn("test-job");
+    when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
+        .thenAnswer(new Answer<State>() {
+          @Override
+          public State answer(InvocationOnMock invocation) {
+            JobMessage message = new JobMessage();
+            message.setMessageText("FooException");
+            message.setTime(TimeUtil.toCloudTime(Instant.now()));
+            message.setMessageImportance("JOB_MESSAGE_ERROR");
+            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
+                .process(Arrays.asList(message));
+            return State.CANCELLED;
+          }
+        });
+
+    DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
+    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
+
+    when(request.execute()).thenReturn(
+        generateMockMetricResponse(false /* success */, true /* tentative */));
+    TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
+    try {
+      runner.run(p, mockRunner);
+    } catch (AssertionError expected) {
+      assertThat(expected.getMessage(), containsString("FooException"));
+      verify(mockJob, atLeastOnce()).cancel();
+      return;
+    }
+    // Note that fail throws an AssertionError which is why it is placed out here
+    // instead of inside the try-catch block.
+    fail("AssertionError expected");
+  }
+}


Mime
View raw message