Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 99795188E7 for ; Wed, 27 Apr 2016 01:08:51 +0000 (UTC) Received: (qmail 84674 invoked by uid 500); 27 Apr 2016 01:08:51 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 84624 invoked by uid 500); 27 Apr 2016 01:08:51 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 84615 invoked by uid 99); 27 Apr 2016 01:08:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 01:08:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A4438180226 for ; Wed, 27 Apr 2016 01:08:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.961 X-Spam-Level: X-Spam-Status: No, score=-2.961 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, KAM_LOTSOFHASH=0.25, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id l2Fowsk4ayEC for ; Wed, 27 Apr 2016 01:08:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 744F25FB08 for ; Wed, 27 Apr 2016 01:08:33 +0000 (UTC) Received: (qmail 80995 invoked by uid 99); 27 Apr 2016 01:08:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 01:08:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50F66E0381; Wed, 27 Apr 2016 01:08:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Wed, 27 Apr 2016 01:08:39 -0000 Message-Id: <5077e5fb7bea43009409210979ff5a96@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/21] incubator-beam git commit: Reorganize Java packages in the sources of the Google Cloud Dataflow runner http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java new file mode 100644 index 0000000..3a39e41 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -0,0 +1,967 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.apache.beam.sdk.util.Structs.addObject; +import static org.apache.beam.sdk.util.Structs.getDictionary; +import static org.apache.beam.sdk.util.Structs.getString; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.RecordingPipelineVisitor; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.OutputReference; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.util.Structs; +import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.Step; +import com.google.api.services.dataflow.model.WorkerPool; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Tests for DataflowPipelineTranslator. + */ +@RunWith(JUnit4.class) +public class DataflowPipelineTranslatorTest implements Serializable { + + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + // A Custom Mockito matcher for an initial Job that checks that all + // expected fields are set. + private static class IsValidCreateRequest extends ArgumentMatcher { + @Override + public boolean matches(Object o) { + Job job = (Job) o; + return job.getId() == null + && job.getProjectId() == null + && job.getName() != null + && job.getType() != null + && job.getEnvironment() != null + && job.getSteps() != null + && job.getCurrentState() == null + && job.getCurrentStateTime() == null + && job.getExecutionInfo() == null + && job.getCreateTime() == null; + } + } + + private Pipeline buildPipeline(DataflowPipelineOptions options) { + options.setRunner(DataflowPipelineRunner.class); + Pipeline p = Pipeline.create(options); + + p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) + .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + + return p; + } + + private static Dataflow buildMockDataflow( + ArgumentMatcher jobMatcher) throws IOException { + Dataflow mockDataflowClient = mock(Dataflow.class); + Dataflow.Projects mockProjects = mock(Dataflow.Projects.class); + Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class); + Dataflow.Projects.Jobs.Create mockRequest = mock( + Dataflow.Projects.Jobs.Create.class); + + when(mockDataflowClient.projects()).thenReturn(mockProjects); + when(mockProjects.jobs()).thenReturn(mockJobs); + when(mockJobs.create(eq("someProject"), argThat(jobMatcher))) + .thenReturn(mockRequest); + + Job resultJob = new Job(); + resultJob.setId("newid"); + when(mockRequest.execute()).thenReturn(resultJob); + return mockDataflowClient; + } + + private static DataflowPipelineOptions buildPipelineOptions() throws IOException { + GcsUtil mockGcsUtil = mock(GcsUtil.class); + when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true); + when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); + + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setGcpCredential(new TestCredential()); + options.setJobName("some-job-name"); + options.setProject("some-project"); + options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString()); + options.setFilesToStage(new LinkedList()); + options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest())); + options.setGcsUtil(mockGcsUtil); + return options; + } + + @Test + public void testSettingOfSdkPipelineOptions() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setRunner(DataflowPipelineRunner.class); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + // Note that the contents of this materialized map may be changed by the act of reading an + // option, which will cause the default to get materialized whereas it would otherwise be + // left absent. It is permissible to simply alter this test to reflect current behavior. + Map settings = new HashMap<>(); + settings.put("appName", "DataflowPipelineTranslatorTest"); + settings.put("project", "some-project"); + settings.put("pathValidatorClass", + "org.apache.beam.runners.dataflow.util.DataflowPathValidator"); + settings.put("runner", "org.apache.beam.runners.dataflow.DataflowPipelineRunner"); + settings.put("jobName", "some-job-name"); + settings.put("tempLocation", "gs://somebucket/some/path"); + settings.put("stagingLocation", "gs://somebucket/some/path/staging"); + settings.put("stableUniqueNames", "WARNING"); + settings.put("streaming", false); + settings.put("numberOfWorkerHarnessThreads", 0); + settings.put("experiments", null); + + Map sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions(); + assertThat(sdkPipelineOptions, hasKey("options")); + assertEquals(settings, sdkPipelineOptions.get("options")); + } + + @Test + public void testNetworkConfig() throws IOException { + final String testNetwork = "test-network"; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setNetwork(testNetwork); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals(testNetwork, + job.getEnvironment().getWorkerPools().get(0).getNetwork()); + } + + @Test + public void testNetworkConfigMissing() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork()); + } + + @Test + public void testSubnetworkConfig() throws IOException { + final String testSubnetwork = "regions/REGION/subnetworks/SUBNETWORK"; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSubnetwork(testSubnetwork); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals(testSubnetwork, + job.getEnvironment().getWorkerPools().get(0).getSubnetwork()); + } + + @Test + public void testSubnetworkConfigMissing() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork()); + } + + @Test + public void testScalingAlgorithmMissing() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + // Autoscaling settings are always set. + assertNull( + job + .getEnvironment() + .getWorkerPools() + .get(0) + .getAutoscalingSettings() + .getAlgorithm()); + assertEquals( + 0, + job + .getEnvironment() + .getWorkerPools() + .get(0) + .getAutoscalingSettings() + .getMaxNumWorkers() + .intValue()); + } + + @Test + public void testScalingAlgorithmNone() throws IOException { + final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = + DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.NONE; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setAutoscalingAlgorithm(noScaling); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals( + "AUTOSCALING_ALGORITHM_NONE", + job + .getEnvironment() + .getWorkerPools() + .get(0) + .getAutoscalingSettings() + .getAlgorithm()); + assertEquals( + 0, + job + .getEnvironment() + .getWorkerPools() + .get(0) + .getAutoscalingSettings() + .getMaxNumWorkers() + .intValue()); + } + + @Test + public void testMaxNumWorkersIsPassedWhenNoAlgorithmIsSet() throws IOException { + final DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType noScaling = null; + DataflowPipelineOptions options = buildPipelineOptions(); + options.setMaxNumWorkers(42); + options.setAutoscalingAlgorithm(noScaling); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertNull( + job + .getEnvironment() + .getWorkerPools() + .get(0) + .getAutoscalingSettings() + .getAlgorithm()); + assertEquals( + 42, + job + .getEnvironment() + .getWorkerPools() + .get(0) + .getAutoscalingSettings() + .getMaxNumWorkers() + .intValue()); + } + + @Test + public void testZoneConfig() throws IOException { + final String testZone = "test-zone-1"; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setZone(testZone); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals(testZone, + job.getEnvironment().getWorkerPools().get(0).getZone()); + } + + @Test + public void testWorkerMachineTypeConfig() throws IOException { + final String testMachineType = "test-machine-type"; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setWorkerMachineType(testMachineType); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + + WorkerPool workerPool = job.getEnvironment().getWorkerPools().get(0); + assertEquals(testMachineType, workerPool.getMachineType()); + } + + @Test + public void testDiskSizeGbConfig() throws IOException { + final Integer diskSizeGb = 1234; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setDiskSizeGb(diskSizeGb); + + Pipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals(diskSizeGb, + job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb()); + } + + @Test + public void testPredefinedAddStep() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + DataflowPipelineTranslator.registerTransformTranslator( + EmbeddedTransform.class, new EmbeddedTranslator()); + + // Create a predefined step using another pipeline + Step predefinedStep = createPredefinedStep(); + + // Create a pipeline that the predefined step will be embedded into + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) + .apply(ParDo.of(new NoOpFn())) + .apply(new EmbeddedTransform(predefinedStep.clone())) + .apply(ParDo.of(new NoOpFn())); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + List steps = job.getSteps(); + assertEquals(4, steps.size()); + + // The input to the embedded step should match the output of the step before + Map step1Out = getOutputPortReference(steps.get(1)); + Map step2In = getDictionary( + steps.get(2).getProperties(), PropertyNames.PARALLEL_INPUT); + assertEquals(step1Out, step2In); + + // The output from the embedded step should match the input of the step after + Map step2Out = getOutputPortReference(steps.get(2)); + Map step3In = getDictionary( + steps.get(3).getProperties(), PropertyNames.PARALLEL_INPUT); + assertEquals(step2Out, step3In); + + // The step should not have been modified other than remapping the input + Step predefinedStepClone = predefinedStep.clone(); + Step embeddedStepClone = steps.get(2).clone(); + predefinedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT); + embeddedStepClone.getProperties().remove(PropertyNames.PARALLEL_INPUT); + assertEquals(predefinedStepClone, embeddedStepClone); + } + + /** + * Construct a OutputReference for the output of the step. + */ + private static OutputReference getOutputPortReference(Step step) throws Exception { + // TODO: This should be done via a Structs accessor. + @SuppressWarnings("unchecked") + List> output = + (List>) step.getProperties().get(PropertyNames.OUTPUT_INFO); + String outputTagId = getString(Iterables.getOnlyElement(output), PropertyNames.OUTPUT_NAME); + return new OutputReference(step.getName(), outputTagId); + } + + /** + * Returns a Step for a DoFn by creating and translating a pipeline. + */ + private static Step createPredefinedStep() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + Pipeline pipeline = Pipeline.create(options); + String stepName = "DoFn1"; + pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) + .apply(ParDo.of(new NoOpFn()).named(stepName)) + .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out")); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + assertEquals(13, job.getSteps().size()); + Step step = job.getSteps().get(1); + assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME)); + return step; + } + + private static class NoOpFn extends DoFn { + @Override public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + } + + /** + * A placeholder transform that will be used to substitute a predefined Step. + */ + private static class EmbeddedTransform + extends PTransform, PCollection> { + private final Step step; + + public EmbeddedTransform(Step step) { + this.step = step; + } + + @Override + public PCollection apply(PCollection input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + input.isBounded()); + } + + @Override + protected Coder getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + } + + /** + * A TransformTranslator that adds the predefined Step using + * {@link TranslationContext#addStep} and remaps the input port reference. + */ + private static class EmbeddedTranslator + implements DataflowPipelineTranslator.TransformTranslator { + @Override public void translate(EmbeddedTransform transform, TranslationContext context) { + addObject(transform.step.getProperties(), PropertyNames.PARALLEL_INPUT, + context.asOutputReference(context.getInput(transform))); + context.addStep(transform, transform.step); + } + } + + /** + * A composite transform that returns an output that is unrelated to + * the input. + */ + private static class UnrelatedOutputCreator + extends PTransform, PCollection> { + + @Override + public PCollection apply(PCollection input) { + // Apply an operation so that this is a composite transform. + input.apply(Count.perElement()); + + // Return a value unrelated to the input. + return input.getPipeline().apply(Create.of(1, 2, 3, 4)); + } + + @Override + protected Coder getDefaultOutputCoder() { + return VarIntCoder.of(); + } + } + + /** + * A composite transform that returns an output that is unbound. + */ + private static class UnboundOutputCreator + extends PTransform, PDone> { + + @Override + public PDone apply(PCollection input) { + // Apply an operation so that this is a composite transform. + input.apply(Count.perElement()); + + return PDone.in(input.getPipeline()); + } + + @Override + protected Coder getDefaultOutputCoder() { + return VoidCoder.of(); + } + } + + /** + * A composite transform that returns a partially bound output. + * + *

This is not allowed and will result in a failure. + */ + private static class PartiallyBoundOutputCreator + extends PTransform, PCollectionTuple> { + + public final TupleTag sumTag = new TupleTag<>("sum"); + public final TupleTag doneTag = new TupleTag<>("done"); + + @Override + public PCollectionTuple apply(PCollection input) { + PCollection sum = input.apply(Sum.integersGlobally()); + + // Fails here when attempting to construct a tuple with an unbound object. + return PCollectionTuple.of(sumTag, sum) + .and(doneTag, PCollection.createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + input.isBounded())); + } + } + + @Test + public void testMultiGraphPipelineSerialization() throws IOException { + Pipeline p = Pipeline.create(buildPipelineOptions()); + + PCollection input = p.begin() + .apply(Create.of(1, 2, 3)); + + input.apply(new UnrelatedOutputCreator()); + input.apply(new UnboundOutputCreator()); + + DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions( + PipelineOptionsFactory.as(DataflowPipelineOptions.class)); + + // Check that translation doesn't fail. + t.translate( + p, (DataflowPipelineRunner) p.getRunner(), Collections.emptyList()); + } + + @Test + public void testPartiallyBoundFailure() throws IOException { + Pipeline p = Pipeline.create(buildPipelineOptions()); + + PCollection input = p.begin() + .apply(Create.of(1, 2, 3)); + + thrown.expect(IllegalStateException.class); + input.apply(new PartiallyBoundOutputCreator()); + + Assert.fail("Failure expected from use of partially bound output"); + } + + /** + * This tests a few corner cases that should not crash. + */ + @Test + public void testGoodWildcards() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); + + applyRead(pipeline, "gs://bucket/foo"); + applyRead(pipeline, "gs://bucket/foo/"); + applyRead(pipeline, "gs://bucket/foo/*"); + applyRead(pipeline, "gs://bucket/foo/?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]"); + applyRead(pipeline, "gs://bucket/foo/*baz*"); + applyRead(pipeline, "gs://bucket/foo/*baz?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); + applyRead(pipeline, "gs://bucket/foo/baz/*"); + applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); + applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); + applyRead(pipeline, "gs://bucket/foo*/baz"); + applyRead(pipeline, "gs://bucket/foo?/baz"); + applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); + + // Check that translation doesn't fail. + t.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()); + } + + private void applyRead(Pipeline pipeline, String path) { + pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); + } + + /** + * Recursive wildcards are not supported. + * This tests "**". + */ + @Test + public void testBadWildcardRecursive() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options); + + pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz")); + + // Check that translation does fail. + thrown.expectCause(Matchers.allOf( + instanceOf(IllegalArgumentException.class), + ThrowableMessageMatcher.hasMessage(containsString("Unsupported wildcard usage")))); + t.translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()); + } + + @Test + public void testToSingletonTranslation() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(ImmutableList.of("disable_ism_side_input")); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1)) + .apply(View.asSingleton()); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + List steps = job.getSteps(); + assertEquals(2, steps.size()); + + Step createStep = steps.get(0); + assertEquals("ParallelRead", createStep.getKind()); + + Step collectionToSingletonStep = steps.get(1); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + + } + + @Test + public void testToIterableTranslation() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView> does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(ImmutableList.of("disable_ism_side_input")); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1, 2, 3)) + .apply(View.asIterable()); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + List steps = job.getSteps(); + assertEquals(2, steps.size()); + + Step createStep = steps.get(0); + assertEquals("ParallelRead", createStep.getKind()); + + Step collectionToSingletonStep = steps.get(1); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + } + + @Test + public void testToSingletonTranslationWithIsmSideInput() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1)) + .apply(View.asSingleton()); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + List steps = job.getSteps(); + assertEquals(5, steps.size()); + + @SuppressWarnings("unchecked") + List> toIsmRecordOutputs = + (List>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO); + assertTrue( + Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); + + Step collectionToSingletonStep = steps.get(4); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + } + + @Test + public void testToIterableTranslationWithIsmSideInput() throws Exception { + // A "change detector" test that makes sure the translation + // of getting a PCollectionView> does not change + // in bad ways during refactor + + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1, 2, 3)) + .apply(View.asIterable()); + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + List steps = job.getSteps(); + assertEquals(3, steps.size()); + + @SuppressWarnings("unchecked") + List> toIsmRecordOutputs = + (List>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO); + assertTrue( + Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); + + + Step collectionToSingletonStep = steps.get(2); + assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); + } + + @Test + public void testStepDisplayData() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); + Pipeline pipeline = Pipeline.create(options); + + DoFn fn1 = new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("foo", "bar") + .add("foo2", DataflowPipelineTranslatorTest.class) + .withLabel("Test Class") + .withLinkUrl("http://www.google.com"); + } + }; + + DoFn fn2 = new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo3", 1234); + } + }; + + ParDo.Bound parDo1 = ParDo.of(fn1); + ParDo.Bound parDo2 = ParDo.of(fn2); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(parDo1) + .apply(parDo2); + + Job job = + translator + .translate( + pipeline, + (DataflowPipelineRunner) pipeline.getRunner(), + Collections.emptyList()) + .getJob(); + + List steps = job.getSteps(); + assertEquals(3, steps.size()); + + Map parDo1Properties = steps.get(1).getProperties(); + Map parDo2Properties = steps.get(2).getProperties(); + assertThat(parDo1Properties, hasKey("display_data")); + + Collection> fn1displayData = + (Collection>) parDo1Properties.get("display_data"); + Collection> fn2displayData = + (Collection>) parDo2Properties.get("display_data"); + + ImmutableSet> expectedFn1DisplayData = ImmutableSet.of( + ImmutableMap.builder() + .put("key", "foo") + .put("type", "STRING") + .put("value", "bar") + .put("namespace", fn1.getClass().getName()) + .build(), + ImmutableMap.builder() + .put("key", "fn") + .put("type", "JAVA_CLASS") + .put("value", fn1.getClass().getName()) + .put("shortValue", fn1.getClass().getSimpleName()) + .put("namespace", parDo1.getClass().getName()) + .build(), + ImmutableMap.builder() + .put("key", "foo2") + .put("type", "JAVA_CLASS") + .put("value", DataflowPipelineTranslatorTest.class.getName()) + .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName()) + .put("namespace", fn1.getClass().getName()) + .put("label", "Test Class") + .put("linkUrl", "http://www.google.com") + .build() + ); + + ImmutableSet> expectedFn2DisplayData = ImmutableSet.of( + ImmutableMap.builder() + .put("key", "fn") + .put("type", "JAVA_CLASS") + .put("value", fn2.getClass().getName()) + .put("shortValue", fn2.getClass().getSimpleName()) + .put("namespace", parDo2.getClass().getName()) + .build(), + ImmutableMap.builder() + .put("key", "foo3") + .put("type", "INTEGER") + .put("value", 1234L) + .put("namespace", fn2.getClass().getName()) + .build() + ); + + assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); + assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java new file mode 100644 index 0000000..ed86be2 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/CustomSourcesTest.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.internal; +import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Sample; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.base.Preconditions; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link CustomSources}. + */ +@RunWith(JUnit4.class) +public class CustomSourcesTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + @Rule public ExpectedLogs logged = ExpectedLogs.none(CustomSources.class); + + static class TestIO { + public static Read fromRange(int from, int to) { + return new Read(from, to, false); + } + + static class Read extends BoundedSource { + final int from; + final int to; + final boolean produceTimestamps; + + Read(int from, int to, boolean produceTimestamps) { + this.from = from; + this.to = to; + this.produceTimestamps = produceTimestamps; + } + + public Read withTimestampsMillis() { + return new Read(from, to, true); + } + + @Override + public List splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) + throws Exception { + List res = new ArrayList<>(); + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + float step = 1.0f * (to - from) / dataflowOptions.getNumWorkers(); + for (int i = 0; i < dataflowOptions.getNumWorkers(); ++i) { + res.add(new Read( + Math.round(from + i * step), Math.round(from + (i + 1) * step), + produceTimestamps)); + } + return res; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 8 * (to - from); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return true; + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return new RangeReader(this); + } + + @Override + public void validate() {} + + @Override + public String toString() { + return "[" + from + ", " + to + ")"; + } + + @Override + public Coder getDefaultOutputCoder() { + return BigEndianIntegerCoder.of(); + } + + private static class RangeReader extends BoundedReader { + // To verify that BasicSerializableSourceFormat calls our methods according to protocol. + enum State { + UNSTARTED, + STARTED, + FINISHED + } + private Read source; + private int current = -1; + private State state = State.UNSTARTED; + + public RangeReader(Read source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + Preconditions.checkState(state == State.UNSTARTED); + state = State.STARTED; + current = source.from; + return (current < source.to); + } + + @Override + public boolean advance() throws IOException { + Preconditions.checkState(state == State.STARTED); + if (current == source.to - 1) { + state = State.FINISHED; + return false; + } + current++; + return true; + } + + @Override + public Integer getCurrent() { + Preconditions.checkState(state == State.STARTED); + return current; + } + + @Override + public Instant getCurrentTimestamp() { + return source.produceTimestamps + ? new Instant(current /* as millis */) : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + @Override + public void close() throws IOException { + Preconditions.checkState(state == State.STARTED || state == State.FINISHED); + state = State.FINISHED; + } + + @Override + public Read getCurrentSource() { + return source; + } + + @Override + public Read splitAtFraction(double fraction) { + int proposedIndex = (int) (source.from + fraction * (source.to - source.from)); + if (proposedIndex <= current) { + return null; + } + Read primary = new Read(source.from, proposedIndex, source.produceTimestamps); + Read residual = new Read(proposedIndex, source.to, source.produceTimestamps); + this.source = primary; + return residual; + } + + @Override + public Double getFractionConsumed() { + return (current == -1) + ? 0.0 + : (1.0 * (1 + current - source.from) / (source.to - source.from)); + } + } + } + } + + @Test + public void testDirectPipelineWithoutTimestamps() throws Exception { + Pipeline p = TestPipeline.create(); + PCollection sum = p + .apply(Read.from(TestIO.fromRange(10, 20))) + .apply(Sum.integersGlobally()) + .apply(Sample.any(1)); + + PAssert.thatSingleton(sum).isEqualTo(145); + p.run(); + } + + @Test + public void testDirectPipelineWithTimestamps() throws Exception { + Pipeline p = TestPipeline.create(); + PCollection sums = + p.apply(Read.from(TestIO.fromRange(10, 20).withTimestampsMillis())) + .apply(Window.into(FixedWindows.of(Duration.millis(3)))) + .apply(Sum.integersGlobally().withoutDefaults()); + // Should group into [10 11] [12 13 14] [15 16 17] [18 19]. + PAssert.that(sums).containsInAnyOrder(21, 37, 39, 48); + p.run(); + } + + @Test + public void testRangeProgressAndSplitAtFraction() throws Exception { + // Show basic usage of getFractionConsumed and splitAtFraction. + // This test only tests TestIO itself, not BasicSerializableSourceFormat. + + DataflowPipelineOptions options = + PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); + TestIO.Read source = TestIO.fromRange(10, 20); + try (BoundedSource.BoundedReader reader = source.createReader(options)) { + assertEquals(0, reader.getFractionConsumed().intValue()); + assertTrue(reader.start()); + assertEquals(0.1, reader.getFractionConsumed(), 1e-6); + assertTrue(reader.advance()); + assertEquals(0.2, reader.getFractionConsumed(), 1e-6); + // Already past 0.0 and 0.1. + assertNull(reader.splitAtFraction(0.0)); + assertNull(reader.splitAtFraction(0.1)); + + { + TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.5); + assertNotNull(residual); + TestIO.Read primary = (TestIO.Read) reader.getCurrentSource(); + assertThat(readFromSource(primary, options), contains(10, 11, 12, 13, 14)); + assertThat(readFromSource(residual, options), contains(15, 16, 17, 18, 19)); + } + + // Range is now [10, 15) and we are at 12. + { + TestIO.Read residual = (TestIO.Read) reader.splitAtFraction(0.8); // give up 14. + assertNotNull(residual); + TestIO.Read primary = (TestIO.Read) reader.getCurrentSource(); + assertThat(readFromSource(primary, options), contains(10, 11, 12, 13)); + assertThat(readFromSource(residual, options), contains(14)); + } + + assertTrue(reader.advance()); + assertEquals(12, reader.getCurrent().intValue()); + assertTrue(reader.advance()); + assertEquals(13, reader.getCurrent().intValue()); + assertFalse(reader.advance()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java new file mode 100644 index 0000000..bfc99e8 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.io; + +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.util.gcsfs.GcsPath; + +import com.google.common.collect.ImmutableList; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.List; + +/** + * {@link DataflowPipelineRunner} specific tests for TextIO Read and Write transforms. + */ +@RunWith(JUnit4.class) +public class DataflowTextIOTest { + + private TestDataflowPipelineOptions buildTestPipelineOptions() { + TestDataflowPipelineOptions options = + PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setGcpCredential(new TestCredential()); + return options; + } + + private GcsUtil buildMockGcsUtil() throws IOException { + GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); + + // Any request to open gets a new bogus channel + Mockito + .when(mockGcsUtil.open(Mockito.any(GcsPath.class))) + .then(new Answer() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + // Any request for expansion returns a list containing the original GcsPath + // This is required to pass validation that occurs in TextIO during apply() + Mockito + .when(mockGcsUtil.expand(Mockito.any(GcsPath.class))) + .then(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + + return mockGcsUtil; + } + + /** + * This tests a few corner cases that should not crash. + */ + @Test + public void testGoodWildcards() throws Exception { + TestDataflowPipelineOptions options = buildTestPipelineOptions(); + options.setGcsUtil(buildMockGcsUtil()); + + Pipeline pipeline = Pipeline.create(options); + + applyRead(pipeline, "gs://bucket/foo"); + applyRead(pipeline, "gs://bucket/foo/"); + applyRead(pipeline, "gs://bucket/foo/*"); + applyRead(pipeline, "gs://bucket/foo/?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]"); + applyRead(pipeline, "gs://bucket/foo/*baz*"); + applyRead(pipeline, "gs://bucket/foo/*baz?"); + applyRead(pipeline, "gs://bucket/foo/[0-9]baz?"); + applyRead(pipeline, "gs://bucket/foo/baz/*"); + applyRead(pipeline, "gs://bucket/foo/baz/*wonka*"); + applyRead(pipeline, "gs://bucket/foo/*baz/wonka*"); + applyRead(pipeline, "gs://bucket/foo*/baz"); + applyRead(pipeline, "gs://bucket/foo?/baz"); + applyRead(pipeline, "gs://bucket/foo[0-9]/baz"); + + // Check that running doesn't fail. + pipeline.run(); + } + + private void applyRead(Pipeline pipeline, String path) { + pipeline.apply("Read(" + path + ")", TextIO.Read.from(path)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java new file mode 100644 index 0000000..71b6b57 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptionsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.options; + +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowPipelineDebugOptions}. */ +@RunWith(JUnit4.class) +public class DataflowPipelineDebugOptionsTest { + @Test + public void testTransformNameMapping() throws Exception { + DataflowPipelineDebugOptions options = PipelineOptionsFactory + .fromArgs(new String[]{"--transformNameMapping={\"a\":\"b\",\"foo\":\"\",\"bar\":\"baz\"}"}) + .as(DataflowPipelineDebugOptions.class); + assertEquals(3, options.getTransformNameMapping().size()); + assertThat(options.getTransformNameMapping(), hasEntry("a", "b")); + assertThat(options.getTransformNameMapping(), hasEntry("foo", "")); + assertThat(options.getTransformNameMapping(), hasEntry("bar", "baz")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java new file mode 100644 index 0000000..e7db40f --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.options; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.ResetDateTimeProvider; +import org.apache.beam.sdk.testing.RestoreSystemProperties; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowPipelineOptions}. */ +@RunWith(JUnit4.class) +public class DataflowPipelineOptionsTest { + @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public ResetDateTimeProvider resetDateTimeProviderRule = new ResetDateTimeProvider(); + + @Test + public void testJobNameIsSet() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + assertEquals("TestJobName", options.getJobName()); + } + + @Test + public void testUserNameIsNotSet() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().remove("user.name"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("TestApplication"); + assertEquals("testapplication--1208190706", options.getJobName()); + assertTrue(options.getJobName().length() <= 40); + } + + @Test + public void testAppNameAndUserNameAreLong() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("1234567890123456789012345678901234567890"); + assertEquals( + "a234567890123456789012345678901234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", + options.getJobName()); + } + + @Test + public void testAppNameIsLong() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "abcde"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("1234567890123456789012345678901234567890"); + assertEquals("a234567890123456789012345678901234567890-abcde-1208190706", options.getJobName()); + } + + @Test + public void testUserNameIsLong() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "abcdeabcdeabcdeabcdeabcdeabcde"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("1234567890"); + assertEquals("a234567890-abcdeabcdeabcdeabcdeabcdeabcde-1208190706", options.getJobName()); + } + + @Test + public void testUtf8UserNameAndApplicationNameIsNormalized() { + resetDateTimeProviderRule.setDateTimeFixed("2014-12-08T19:07:06.698Z"); + System.getProperties().put("user.name", "ði ıntəˈnæʃənəl "); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAppName("fəˈnɛtık əsoʊsiˈeıʃn"); + assertEquals("f00n0t0k00so0si0e00n-0i00nt00n000n0l0-1208190706", options.getJobName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java new file mode 100644 index 0000000..58f8514 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowProfilingOptionsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.options; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DataflowProfilingOptions}. + */ +@RunWith(JUnit4.class) +public class DataflowProfilingOptionsTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + public void testOptionsObject() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] { + "--enableProfilingAgent", "--profilingAgentConfiguration={\"interval\": 21}"}) + .as(DataflowPipelineOptions.class); + assertTrue(options.getEnableProfilingAgent()); + + String json = MAPPER.writeValueAsString(options); + assertThat(json, Matchers.containsString( + "\"profilingAgentConfiguration\":{\"interval\":21}")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java new file mode 100644 index 0000000..5b12ad5 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptionsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.options; + +import static org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.WARN; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides; + +import com.google.common.collect.ImmutableMap; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowWorkerLoggingOptions}. */ +@RunWith(JUnit4.class) +public class DataflowWorkerLoggingOptionsTest { + private static final ObjectMapper MAPPER = new ObjectMapper(); + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testWorkerLogLevelOverrideWithInvalidLogLevel() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Unsupported log level"); + WorkerLogLevelOverrides.from(ImmutableMap.of("Name", "FakeLevel")); + } + + @Test + public void testWorkerLogLevelOverrideForClass() throws Exception { + assertEquals("{\"org.junit.Test\":\"WARN\"}", + MAPPER.writeValueAsString( + new WorkerLogLevelOverrides().addOverrideForClass(Test.class, WARN))); + } + + @Test + public void testWorkerLogLevelOverrideForPackage() throws Exception { + assertEquals("{\"org.junit\":\"WARN\"}", + MAPPER.writeValueAsString( + new WorkerLogLevelOverrides().addOverrideForPackage(Test.class.getPackage(), WARN))); + } + + @Test + public void testWorkerLogLevelOverrideForName() throws Exception { + assertEquals("{\"A\":\"WARN\"}", + MAPPER.writeValueAsString( + new WorkerLogLevelOverrides().addOverrideForName("A", WARN))); + } + + @Test + public void testSerializationAndDeserializationOf() throws Exception { + String testValue = "{\"A\":\"WARN\"}"; + assertEquals(testValue, + MAPPER.writeValueAsString( + MAPPER.readValue(testValue, WorkerLogLevelOverrides.class))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/02190985/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java new file mode 100644 index 0000000..d6de501 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowPipelineRunnerTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.testing; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.dataflow.DataflowPipelineJob; +import org.apache.beam.runners.dataflow.DataflowPipelineRunner; +import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.runners.dataflow.util.MonitoringUtil.JobMessagesHandler; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.util.TimeUtil; +import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.values.PCollection; + +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.Json; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.JobMessage; +import com.google.api.services.dataflow.model.JobMetrics; +import com.google.api.services.dataflow.model.MetricStructuredName; +import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +/** Tests for {@link TestDataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class TestDataflowPipelineRunnerTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + @Mock private MockHttpTransport transport; + @Mock private MockLowLevelHttpRequest request; + @Mock private GcsUtil mockGcsUtil; + + private TestDataflowPipelineOptions options; + private Dataflow service; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + when(transport.buildRequest(anyString(), anyString())).thenReturn(request); + doCallRealMethod().when(request).getContentAsString(); + service = new Dataflow(transport, Transport.getJsonFactory(), null); + + options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class); + options.setAppName("TestAppName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setTempRoot("gs://test"); + options.setGcpCredential(new TestCredential()); + options.setDataflowClient(service); + options.setRunner(TestDataflowPipelineRunner.class); + options.setPathValidatorClass(NoopPathValidator.class); + } + + @Test + public void testToString() { + assertEquals("TestDataflowPipelineRunner#TestAppName", + new TestDataflowPipelineRunner(options).toString()); + } + + @Test + public void testRunBatchJobThatSucceeds() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.DONE); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + assertEquals(mockJob, runner.run(p, mockRunner)); + } + + @Test + public void testRunBatchJobThatFails() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.FAILED); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testBatchPipelineFailsIfException() throws Exception { + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenAnswer(new Answer() { + @Override + public State answer(InvocationOnMock invocation) { + JobMessage message = new JobMessage(); + message.setMessageText("FooException"); + message.setTime(TimeUtil.toCloudTime(Instant.now())); + message.setMessageImportance("JOB_MESSAGE_ERROR"); + ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2]) + .process(Arrays.asList(message)); + return State.CANCELLED; + } + }); + + DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("FooException")); + verify(mockJob, atLeastOnce()).cancel(); + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testRunStreamingJobThatSucceeds() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + runner.run(p, mockRunner); + } + + @Test + public void testRunStreamingJobThatFails() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + + DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } + + @Test + public void testCheckingForSuccessWhenPAssertSucceeds() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, true /* tentative */)); + doReturn(State.DONE).when(job).getState(); + assertEquals(Optional.of(true), runner.checkForSuccess(job)); + } + + @Test + public void testCheckingForSuccessWhenPAssertFails() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + doReturn(State.DONE).when(job).getState(); + assertEquals(Optional.of(false), runner.checkForSuccess(job)); + } + + @Test + public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, false /* tentative */)); + doReturn(State.RUNNING).when(job).getState(); + assertEquals(Optional.absent(), runner.checkForSuccess(job)); + } + + private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean tentative) + throws Exception { + MetricStructuredName name = new MetricStructuredName(); + name.setName(success ? "PAssertSuccess" : "PAssertFailure"); + name.setContext( + tentative ? ImmutableMap.of("tentative", "") : ImmutableMap.of()); + + MetricUpdate metric = new MetricUpdate(); + metric.setName(name); + metric.setScalar(BigDecimal.ONE); + + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setContentType(Json.MEDIA_TYPE); + JobMetrics jobMetrics = new JobMetrics(); + jobMetrics.setMetrics(Lists.newArrayList(metric)); + // N.B. Setting the factory is necessary in order to get valid JSON. + jobMetrics.setFactory(Transport.getJsonFactory()); + response.setContent(jobMetrics.toPrettyString()); + return response; + } + + @Test + public void testStreamingPipelineFailsIfServiceFails() throws Exception { + DataflowPipelineJob job = + spy(new DataflowPipelineJob("test-project", "test-job", service, null)); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + when(request.execute()).thenReturn( + generateMockMetricResponse(true /* success */, false /* tentative */)); + doReturn(State.FAILED).when(job).getState(); + assertEquals(Optional.of(false), runner.checkForSuccess(job)); + } + + @Test + public void testStreamingPipelineFailsIfException() throws Exception { + options.setStreaming(true); + Pipeline p = TestPipeline.create(options); + PCollection pc = p.apply(Create.of(1, 2, 3)); + PAssert.that(pc).containsInAnyOrder(1, 2, 3); + + DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); + when(mockJob.getDataflowClient()).thenReturn(service); + when(mockJob.getState()).thenReturn(State.RUNNING); + when(mockJob.getProjectId()).thenReturn("test-project"); + when(mockJob.getJobId()).thenReturn("test-job"); + when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class))) + .thenAnswer(new Answer() { + @Override + public State answer(InvocationOnMock invocation) { + JobMessage message = new JobMessage(); + message.setMessageText("FooException"); + message.setTime(TimeUtil.toCloudTime(Instant.now())); + message.setMessageImportance("JOB_MESSAGE_ERROR"); + ((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2]) + .process(Arrays.asList(message)); + return State.CANCELLED; + } + }); + + DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class); + when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob); + + when(request.execute()).thenReturn( + generateMockMetricResponse(false /* success */, true /* tentative */)); + TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner(); + try { + runner.run(p, mockRunner); + } catch (AssertionError expected) { + assertThat(expected.getMessage(), containsString("FooException")); + verify(mockJob, atLeastOnce()).cancel(); + return; + } + // Note that fail throws an AssertionError which is why it is placed out here + // instead of inside the try-catch block. + fail("AssertionError expected"); + } +}