beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/4] beam git commit: Move TestDataflowRunner into dataflow package
Date Sun, 07 May 2017 23:12:18 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/b46e7b9b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
deleted file mode 100644
index eb068e6..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ /dev/null
@@ -1,655 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.testing;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.dataflow.model.JobMessage;
-import com.google.api.services.dataflow.model.JobMetrics;
-import com.google.api.services.dataflow.model.MetricStructuredName;
-import com.google.api.services.dataflow.model.MetricUpdate;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.dataflow.DataflowClient;
-import org.apache.beam.runners.dataflow.DataflowPipelineJob;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil;
-import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler;
-import org.apache.beam.runners.dataflow.util.TimeUtil;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.SerializableMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.NoopPathValidator;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/** Tests for {@link TestDataflowRunner}. */
-@RunWith(JUnit4.class)
-public class TestDataflowRunnerTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-  @Mock private DataflowClient mockClient;
-
-  private TestDataflowPipelineOptions options;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-
-    options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
-    options.setAppName("TestAppName");
-    options.setProject("test-project");
-    options.setTempLocation("gs://test/temp/location");
-    options.setTempRoot("gs://test");
-    options.setGcpCredential(new TestCredential());
-    options.setRunner(TestDataflowRunner.class);
-    options.setPathValidatorClass(NoopPathValidator.class);
-  }
-
-  @Test
-  public void testToString() {
-    assertEquals("TestDataflowRunner#TestAppName",
-        TestDataflowRunner.fromOptions(options).toString());
-  }
-
-  @Test
-  public void testRunBatchJobThatSucceeds() throws Exception {
-    Pipeline p = Pipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    assertEquals(mockJob, runner.run(p, mockRunner));
-  }
-
-  @Test
-  public void testRunBatchJobThatFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, false /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testBatchPipelineFailsIfException() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      assertThat(expected.getMessage(), containsString("FooException"));
-      verify(mockJob, never()).cancel();
-      return;
-    }
-    // Note that fail throws an AssertionError which is why it is placed out here
-    // instead of inside the try-catch block.
-    fail("AssertionError expected");
-  }
-
-  /**
-   * A streaming job that terminates with no error messages is a success.
-   */
-  @Test
-  public void testRunStreamingJobUsingPAssertThatSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testRunStreamingJobNotUsingPAssertThatSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockStreamingMetricResponse(ImmutableMap.<String, BigDecimal>of()));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    runner.run(p, mockRunner);
-  }
-
-  /**
-   * Tests that a streaming job with a false {@link PAssert} fails.
-   *
-   * <p>Currently, this failure is indistinguishable from a non-{@link PAssert} failure,
because it
-   * is detected only by failure job messages. With fuller metric support, this can detect
a PAssert
-   * failure via metrics and raise an {@link AssertionError} in just that case.
-   */
-  @Test
-  public void testRunStreamingJobThatFails() throws Exception {
-    testStreamingPipelineFailsIfException();
-  }
-
-  private JobMetrics generateMockMetricResponse(boolean success, boolean tentative)
-      throws Exception {
-    List<MetricUpdate> metrics = generateMockMetrics(success, tentative);
-    return buildJobMetrics(metrics);
-  }
-
-  private List<MetricUpdate> generateMockMetrics(boolean success, boolean tentative)
{
-    MetricStructuredName name = new MetricStructuredName();
-    name.setName(success ? "PAssertSuccess" : "PAssertFailure");
-    name.setContext(
-        tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.<String, String>of());
-
-    MetricUpdate metric = new MetricUpdate();
-    metric.setName(name);
-    metric.setScalar(BigDecimal.ONE);
-    return Lists.newArrayList(metric);
-  }
-
-  private JobMetrics generateMockStreamingMetricResponse(Map<String,
-      BigDecimal> metricMap) throws IOException {
-    return buildJobMetrics(generateMockStreamingMetrics(metricMap));
-  }
-
-  private List<MetricUpdate> generateMockStreamingMetrics(Map<String, BigDecimal>
metricMap) {
-    List<MetricUpdate> metrics = Lists.newArrayList();
-    for (Map.Entry<String, BigDecimal> entry : metricMap.entrySet()) {
-      MetricStructuredName name = new MetricStructuredName();
-      name.setName(entry.getKey());
-
-      MetricUpdate metric = new MetricUpdate();
-      metric.setName(name);
-      metric.setScalar(entry.getValue());
-      metrics.add(metric);
-    }
-    return metrics;
-  }
-
-  private JobMetrics buildJobMetrics(List<MetricUpdate> metricList) {
-    JobMetrics jobMetrics = new JobMetrics();
-    jobMetrics.setMetrics(metricList);
-    // N.B. Setting the factory is necessary in order to get valid JSON.
-    jobMetrics.setFactory(Transport.getJsonFactory());
-    return jobMetrics;
-  }
-
-  /**
-   * Tests that a tentative {@code true} from metrics indicates that every {@link PAssert}
has
-   * succeeded.
-   */
-  @Test
-  public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(buildJobMetrics(generateMockMetrics(true /* success */, true /* tentative
*/)));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.DONE).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(true)));
-  }
-
-  /**
-   * Tests that when we just see a tentative failure for a {@link PAssert} it is considered
a
-   * conclusive failure.
-   */
-  @Test
-  public void testCheckingForSuccessWhenPAssertFails() throws Exception {
-    DataflowPipelineJob job =
-        spy(new DataflowPipelineJob(mockClient, "test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(
-            buildJobMetrics(generateMockMetrics(false /* success */, true /* tentative */)));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.DONE).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
-  }
-
-  @Test
-  public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options,
null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(
-            buildJobMetrics(generateMockMetrics(true /* success */, false /* tentative */)));
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    runner.updatePAssertCount(p);
-    doReturn(State.RUNNING).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.<Boolean>absent()));
-  }
-
-  /**
-   * Tests that if a streaming pipeline terminates with FAIL that the check for PAssert
-   * success is a conclusive failure.
-   */
-  @Test
-  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options,
null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.FAILED).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), equalTo(Optional.of(false)));
-  }
-
-  /**
-   * Tests that if a streaming pipeline crash loops for a non-assertion reason that the test
run
-   * throws an {@link AssertionError}.
-   *
-   * <p>This is a known limitation/bug of the runner that it does not distinguish the
two modes of
-   * failure.
-   */
-  @Test
-  public void testStreamingPipelineFailsIfException() throws Exception {
-    options.setStreaming(true);
-    Pipeline pipeline = TestPipeline.create(options);
-    PCollection<Integer> pc = pipeline.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.RUNNING);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenAnswer(new Answer<State>() {
-          @Override
-          public State answer(InvocationOnMock invocation) {
-            JobMessage message = new JobMessage();
-            message.setMessageText("FooException");
-            message.setTime(TimeUtil.toCloudTime(Instant.now()));
-            message.setMessageImportance("JOB_MESSAGE_ERROR");
-            ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[1])
-                .process(Arrays.asList(message));
-            return State.CANCELLED;
-          }
-        });
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-
-    try {
-      runner.run(pipeline, mockRunner);
-    } catch (AssertionError exc) {
-      return;
-    }
-    fail("AssertionError expected");
-  }
-
-  @Test
-  public void testGetJobMetricsThatSucceeds() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options,
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    JobMetrics metrics = runner.getJobMetrics(job);
-
-    assertEquals(1, metrics.getMetrics().size());
-    assertEquals(generateMockMetrics(true /* success */, true /* tentative */),
-        metrics.getMetrics());
-  }
-
-  @Test
-  public void testGetJobMetricsThatFailsForException() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, "test-job", options,
null));
-    Pipeline p = TestPipeline.create(options);
-    p.apply(Create.of(1, 2, 3));
-
-    when(mockClient.getJobMetrics(anyString())).thenThrow(new IOException());
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    assertNull(runner.getJobMetrics(job));
-  }
-
-  @Test
-  public void testBatchOnCreateMatcher() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob,
0));
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testStreamingOnCreateMatcher() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob,
0));
-
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */
-        ));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob,
1));
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  /**
-   * Tests that when a streaming pipeline terminates and doesn't fail due to {@link PAssert}
that
-   * the {@link TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher}
is
-   * invoked.
-   */
-  @Test
-  public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.DONE);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob,
1));
-
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.DONE);
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
-    runner.run(p, mockRunner);
-  }
-
-  @Test
-  public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception {
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
-
-    when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      verify(mockJob, Mockito.times(1)).waitUntilFinish(
-          any(Duration.class), any(JobMessagesHandler.class));
-      return;
-    }
-    fail("Expected an exception on pipeline failure.");
-  }
-
-  /**
-   * Tests that when a streaming pipeline terminates in FAIL that the {@link
-   * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success matcher} is
not
-   * invoked.
-   */
-  @Test
-  public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception {
-    options.setStreaming(true);
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
-    when(mockJob.getState()).thenReturn(State.FAILED);
-    when(mockJob.getProjectId()).thenReturn("test-project");
-    when(mockJob.getJobId()).thenReturn("test-job");
-
-    DataflowRunner mockRunner = Mockito.mock(DataflowRunner.class);
-    when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
-
-    TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
-
-    when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
-        .thenReturn(State.FAILED);
-
-    runner.run(p, mockRunner);
-    // If the onSuccessMatcher were invoked, it would have crashed here.
-  }
-
-  static class TestSuccessMatcher extends BaseMatcher<PipelineResult> implements
-      SerializableMatcher<PipelineResult> {
-    private final DataflowPipelineJob mockJob;
-    private final int called;
-
-    public TestSuccessMatcher(DataflowPipelineJob job, int times) {
-      this.mockJob = job;
-      this.called = times;
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      if (!(o instanceof PipelineResult)) {
-        fail(String.format("Expected PipelineResult but received %s", o));
-      }
-      try {
-        verify(mockJob, Mockito.times(called)).waitUntilFinish(
-            any(Duration.class), any(JobMessagesHandler.class));
-      } catch (IOException | InterruptedException e) {
-        throw new AssertionError(e);
-      }
-      assertSame(mockJob, o);
-      return true;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-    }
-  }
-
-  static class TestFailureMatcher extends BaseMatcher<PipelineResult> implements
-      SerializableMatcher<PipelineResult> {
-    @Override
-    public boolean matches(Object o) {
-      fail("OnSuccessMatcher should not be called on pipeline failure.");
-      return false;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-    }
-  }
-}


Mime
View raw message