Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E4B0C19690 for ; Thu, 14 Apr 2016 04:48:22 +0000 (UTC) Received: (qmail 51143 invoked by uid 500); 14 Apr 2016 04:48:22 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 51093 invoked by uid 500); 14 Apr 2016 04:48:22 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 51084 invoked by uid 99); 14 Apr 2016 04:48:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:48:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 241361801DA for ; Thu, 14 Apr 2016 04:48:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id QOg_uAJUwc_q for ; Thu, 14 Apr 2016 04:48:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 209E35FE5F for ; Thu, 14 Apr 2016 04:47:49 +0000 (UTC) Received: (qmail 47640 invoked by uid 99); 14 Apr 2016 04:47:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:47:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DFADAE1073; Thu, 14 Apr 2016 04:47:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Thu, 14 Apr 2016 04:48:12 -0000 Message-Id: <74d0d68d290a498ab4fce9fd82d4c469@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java new file mode 100644 index 0000000..c6dca2d --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineRunnerTest.java @@ -0,0 +1,1369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static com.google.cloud.dataflow.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.ListJobsResponse; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.VarLongCoder; +import com.google.cloud.dataflow.sdk.io.AvroIO; +import com.google.cloud.dataflow.sdk.io.AvroSource; +import com.google.cloud.dataflow.sdk.io.BigQueryIO; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.TextIO; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions.CheckEnabled; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.BatchViewAsList; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.BatchViewAsMap; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.BatchViewAsMultimap; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.TransformedMap; +import com.google.cloud.dataflow.sdk.runners.dataflow.TestCountingSource; +import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat; +import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat.IsmRecord; +import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat.IsmRecordCoder; +import com.google.cloud.dataflow.sdk.runners.worker.IsmFormat.MetadataKeyCoder; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; +import com.google.cloud.dataflow.sdk.util.GcsUtil; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.TestCredential; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.cloud.dataflow.sdk.values.TimestampedValue; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.cloud.dataflow.sdk.values.TupleTagList; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import org.hamcrest.Description; +import org.hamcrest.Matchers; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Tests for DataflowPipelineRunner. + */ +@RunWith(JUnit4.class) +public class DataflowPipelineRunnerTest { + + private static final String PROJECT_ID = "some-project"; + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + // Asserts that the given Job has all expected fields set. + private static void assertValidJob(Job job) { + assertNull(job.getId()); + assertNull(job.getCurrentState()); + } + + private DataflowPipeline buildDataflowPipeline(DataflowPipelineOptions options) { + options.setStableUniqueNames(CheckEnabled.ERROR); + DataflowPipeline p = DataflowPipeline.create(options); + + p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) + .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + + return p; + } + + private static Dataflow buildMockDataflow( + final ArgumentCaptor jobCaptor) throws IOException { + Dataflow mockDataflowClient = mock(Dataflow.class); + Dataflow.Projects mockProjects = mock(Dataflow.Projects.class); + Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class); + Dataflow.Projects.Jobs.Create mockRequest = + mock(Dataflow.Projects.Jobs.Create.class); + Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class); + + when(mockDataflowClient.projects()).thenReturn(mockProjects); + when(mockProjects.jobs()).thenReturn(mockJobs); + when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture())) + .thenReturn(mockRequest); + when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList); + when(mockList.setPageToken(anyString())).thenReturn(mockList); + when(mockList.execute()) + .thenReturn(new ListJobsResponse().setJobs( + Arrays.asList(new Job() + .setName("oldJobName") + .setId("oldJobId") + .setCurrentState("JOB_STATE_RUNNING")))); + + Job resultJob = new Job(); + resultJob.setId("newid"); + when(mockRequest.execute()).thenReturn(resultJob); + return mockDataflowClient; + } + + private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException { + GcsUtil mockGcsUtil = mock(GcsUtil.class); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) + .then(new Answer() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); + when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists); + return mockGcsUtil; + } + + private DataflowPipelineOptions buildPipelineOptions() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + return buildPipelineOptions(jobCaptor); + } + + private DataflowPipelineOptions buildPipelineOptions( + ArgumentCaptor jobCaptor) throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setProject(PROJECT_ID); + options.setTempLocation("gs://somebucket/some/path"); + // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. + options.setFilesToStage(new LinkedList()); + options.setDataflowClient(buildMockDataflow(jobCaptor)); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpCredential(new TestCredential()); + return options; + } + + @Test + public void testRun() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipeline p = buildDataflowPipeline(options); + DataflowPipelineJob job = p.run(); + assertEquals("newid", job.getJobId()); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testRunReturnDifferentRequestId() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + Dataflow mockDataflowClient = options.getDataflowClient(); + Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class); + when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class))) + .thenReturn(mockRequest); + Job resultJob = new Job(); + resultJob.setId("newid"); + // Return a different request id. + resultJob.setClientRequestId("different_request_id"); + when(mockRequest.execute()).thenReturn(resultJob); + + DataflowPipeline p = buildDataflowPipeline(options); + try { + p.run(); + fail("Expected DataflowJobAlreadyExistsException"); + } catch (DataflowJobAlreadyExistsException expected) { + assertThat(expected.getMessage(), + containsString("If you want to submit a second job, try again by setting a " + + "different name using --jobName.")); + assertEquals(expected.getJob().getJobId(), resultJob.getId()); + } + } + + @Test + public void testUpdate() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setUpdate(true); + options.setJobName("oldJobName"); + DataflowPipeline p = buildDataflowPipeline(options); + DataflowPipelineJob job = p.run(); + assertEquals("newid", job.getJobId()); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testUpdateNonExistentPipeline() throws IOException { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Could not find running job named badJobName"); + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setUpdate(true); + options.setJobName("badJobName"); + DataflowPipeline p = buildDataflowPipeline(options); + p.run(); + } + + @Test + public void testUpdateAlreadyUpdatedPipeline() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setUpdate(true); + options.setJobName("oldJobName"); + Dataflow mockDataflowClient = options.getDataflowClient(); + Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class); + when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class))) + .thenReturn(mockRequest); + final Job resultJob = new Job(); + resultJob.setId("newid"); + // Return a different request id. + resultJob.setClientRequestId("different_request_id"); + when(mockRequest.execute()).thenReturn(resultJob); + + DataflowPipeline p = buildDataflowPipeline(options); + + thrown.expect(DataflowJobAlreadyUpdatedException.class); + thrown.expect(new TypeSafeMatcher() { + @Override + public void describeTo(Description description) { + description.appendText("Expected job ID: " + resultJob.getId()); + } + + @Override + protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) { + return resultJob.getId().equals(item.getJob().getJobId()); + } + }); + thrown.expectMessage("The job named oldjobname with id: oldJobId has already been updated " + + "into job id: newid and cannot be updated again."); + p.run(); + } + + @Test + public void testRunWithFiles() throws IOException { + // Test that the function DataflowPipelineRunner.stageFiles works as + // expected. + GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */); + final String gcsStaging = "gs://somebucket/some/path"; + final String gcsTemp = "gs://somebucket/some/temp/path"; + final String cloudDataflowDataset = "somedataset"; + + // Create some temporary files. + File temp1 = File.createTempFile("DataflowPipelineRunnerTest", "txt"); + temp1.deleteOnExit(); + File temp2 = File.createTempFile("DataflowPipelineRunnerTest2", "txt"); + temp2.deleteOnExit(); + + String overridePackageName = "alias.txt"; + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setFilesToStage(ImmutableList.of( + temp1.getAbsolutePath(), + overridePackageName + "=" + temp2.getAbsolutePath())); + options.setStagingLocation(gcsStaging); + options.setTempLocation(gcsTemp); + options.setTempDatasetId(cloudDataflowDataset); + options.setProject(PROJECT_ID); + options.setJobName("job"); + options.setDataflowClient(buildMockDataflow(jobCaptor)); + options.setGcsUtil(mockGcsUtil); + options.setGcpCredential(new TestCredential()); + + DataflowPipeline p = buildDataflowPipeline(options); + + DataflowPipelineJob job = p.run(); + assertEquals("newid", job.getJobId()); + + Job workflowJob = jobCaptor.getValue(); + assertValidJob(workflowJob); + + assertEquals( + 2, + workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().size()); + DataflowPackage workflowPackage1 = + workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(0); + assertThat(workflowPackage1.getName(), startsWith(temp1.getName())); + DataflowPackage workflowPackage2 = + workflowJob.getEnvironment().getWorkerPools().get(0).getPackages().get(1); + assertEquals(overridePackageName, workflowPackage2.getName()); + + assertEquals( + "storage.googleapis.com/somebucket/some/temp/path", + workflowJob.getEnvironment().getTempStoragePrefix()); + assertEquals( + cloudDataflowDataset, + workflowJob.getEnvironment().getDataset()); + assertEquals( + DataflowReleaseInfo.getReleaseInfo().getName(), + workflowJob.getEnvironment().getUserAgent().get("name")); + assertEquals( + DataflowReleaseInfo.getReleaseInfo().getVersion(), + workflowJob.getEnvironment().getUserAgent().get("version")); + } + + @Test + public void runWithDefaultFilesToStage() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setFilesToStage(null); + DataflowPipelineRunner.fromOptions(options); + assertTrue(!options.getFilesToStage().isEmpty()); + } + + @Test + public void detectClassPathResourceWithFileResources() throws Exception { + File file = tmpFolder.newFile("file"); + File file2 = tmpFolder.newFile("file2"); + URLClassLoader classLoader = new URLClassLoader(new URL[]{ + file.toURI().toURL(), + file2.toURI().toURL() + }); + + assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()), + DataflowPipelineRunner.detectClassPathResourcesToStage(classLoader)); + } + + @Test + public void detectClassPathResourcesWithUnsupportedClassLoader() { + ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unable to use ClassLoader to detect classpath elements."); + + DataflowPipelineRunner.detectClassPathResourcesToStage(mockClassLoader); + } + + @Test + public void detectClassPathResourceWithNonFileResources() throws Exception { + String url = "http://www.google.com/all-the-secrets.jar"; + URLClassLoader classLoader = new URLClassLoader(new URL[]{ + new URL(url) + }); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Unable to convert url (" + url + ") to file."); + + DataflowPipelineRunner.detectClassPathResourcesToStage(classLoader); + } + + @Test + public void testGcsStagingLocationInitialization() throws Exception { + // Test that the staging location is initialized correctly. + String gcsTemp = "gs://somebucket/some/temp/path"; + + // Set temp location (required), and check that staging location is set. + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setTempLocation(gcsTemp); + options.setProject(PROJECT_ID); + options.setGcpCredential(new TestCredential()); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + + DataflowPipelineRunner.fromOptions(options); + + assertNotNull(options.getStagingLocation()); + } + + @Test + public void testNonGcsFilePathInReadFailure() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath())); + + thrown.expectCause(Matchers.allOf( + instanceOf(IllegalArgumentException.class), + ThrowableMessageMatcher.hasMessage( + containsString("expected a valid 'gs://' path but was given")))); + p.run(); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testNonGcsFilePathInWriteFailure() throws IOException { + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); + PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); + pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); + } + + @Test + public void testMultiSlashGcsFileReadPath() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + p.apply(TextIO.Read.named("ReadInvalidGcsFile") + .from("gs://bucket/tmp//file")); + + thrown.expectCause(Matchers.allOf( + instanceOf(IllegalArgumentException.class), + ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes")))); + p.run(); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testMultiSlashGcsFileWritePath() throws IOException { + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); + PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("consecutive slashes"); + pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file")); + } + + @Test + public void testInvalidTempLocation() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setTempLocation("file://temp/location"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); + DataflowPipelineRunner.fromOptions(options); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testInvalidStagingLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setStagingLocation("file://my/staging/location"); + try { + DataflowPipelineRunner.fromOptions(options); + fail("fromOptions should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); + } + options.setStagingLocation("my/staging/location"); + try { + DataflowPipelineRunner.fromOptions(options); + fail("fromOptions should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); + } + } + + @Test + public void testNonExistentTempLocation() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setGcsUtil(mockGcsUtil); + options.setTempLocation("gs://non-existent-bucket/location"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString( + "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); + DataflowPipelineRunner.fromOptions(options); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testNonExistentStagingLocation() throws IOException { + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + options.setGcsUtil(mockGcsUtil); + options.setStagingLocation("gs://non-existent-bucket/location"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString( + "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); + DataflowPipelineRunner.fromOptions(options); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testNoProjectFails() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + + options.setRunner(DataflowPipelineRunner.class); + // Explicitly set to null to prevent the default instance factory from reading credentials + // from a user's environment, causing this test to fail. + options.setProject(null); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Project id"); + thrown.expectMessage("when running a Dataflow in the cloud"); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testProjectId() throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("foo-12345"); + + options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpCredential(new TestCredential()); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testProjectPrefix() throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("google.com:some-project-12345"); + + options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setGcpCredential(new TestCredential()); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testProjectNumber() throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("12345"); + + options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Project ID"); + thrown.expectMessage("project number"); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testProjectDescription() throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("some project"); + + options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Project ID"); + thrown.expectMessage("project description"); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testInvalidNumberOfWorkerHarnessThreads() throws IOException { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("foo-12345"); + + options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + + options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Number of worker harness threads"); + thrown.expectMessage("Please make sure the value is non-negative."); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testNoStagingLocationAndNoTempLocationFails() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("foo-project"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Missing required value: at least one of tempLocation or stagingLocation must be set."); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testStagingLocationAndNoTempLocationSucceeds() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setGcpCredential(new TestCredential()); + options.setProject("foo-project"); + options.setStagingLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testTempLocationAndNoStagingLocationSucceeds() throws Exception { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setGcpCredential(new TestCredential()); + options.setProject("foo-project"); + options.setTempLocation("gs://spam/ham/eggs"); + options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + + DataflowPipelineRunner.fromOptions(options); + } + + @Test + public void testInvalidJobName() throws IOException { + List invalidNames = Arrays.asList( + "invalid_name", + "0invalid", + "invalid-"); + List expectedReason = Arrays.asList( + "JobName invalid", + "JobName invalid", + "JobName invalid"); + + for (int i = 0; i < invalidNames.size(); ++i) { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setJobName(invalidNames.get(i)); + + try { + DataflowPipelineRunner.fromOptions(options); + fail("Expected IllegalArgumentException for jobName " + + options.getJobName()); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), + containsString(expectedReason.get(i))); + } + } + } + + @Test + public void testValidJobName() throws IOException { + List names = Arrays.asList("ok", "Ok", "A-Ok", "ok-123", + "this-one-is-fairly-long-01234567890123456789"); + + for (String name : names) { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setJobName(name); + + DataflowPipelineRunner runner = DataflowPipelineRunner + .fromOptions(options); + assertNotNull(runner); + } + } + + /** + * A fake PTransform for testing. + */ + public static class TestTransform + extends PTransform, PCollection> { + public boolean translated = false; + + @Override + public PCollection apply(PCollection input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + input.isBounded()); + } + + @Override + protected Coder getDefaultOutputCoder(PCollection input) { + return input.getCoder(); + } + } + + @Test + public void testTransformTranslatorMissing() throws IOException { + // Test that we throw if we don't provide a translation. + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + + DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipeline p = DataflowPipeline.create(options); + + p.apply(Create.of(Arrays.asList(1, 2, 3))) + .apply(new TestTransform()); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage(Matchers.containsString("no translator registered")); + DataflowPipelineTranslator.fromOptions(options) + .translate(p, p.getRunner(), Collections.emptyList()); + assertValidJob(jobCaptor.getValue()); + } + + @Test + public void testTransformTranslator() throws IOException { + // Test that we can provide a custom translation + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipeline p = DataflowPipeline.create(options); + TestTransform transform = new TestTransform(); + + p.apply(Create.of(Arrays.asList(1, 2, 3)).withCoder(BigEndianIntegerCoder.of())) + .apply(transform); + + DataflowPipelineTranslator translator = DataflowPipelineRunner + .fromOptions(options).getTranslator(); + + DataflowPipelineTranslator.registerTransformTranslator( + TestTransform.class, + new DataflowPipelineTranslator.TransformTranslator() { + @SuppressWarnings("unchecked") + @Override + public void translate( + TestTransform transform, + DataflowPipelineTranslator.TranslationContext context) { + transform.translated = true; + + // Note: This is about the minimum needed to fake out a + // translation. This obviously isn't a real translation. + context.addStep(transform, "TestTranslate"); + context.addOutput("output", context.getOutput(transform)); + } + }); + + translator.translate( + p, p.getRunner(), Collections.emptyList()); + assertTrue(transform.translated); + } + + /** Records all the composite transforms visited within the Pipeline. */ + private static class CompositeTransformRecorder implements PipelineVisitor { + private List> transforms = new ArrayList<>(); + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + if (node.getTransform() != null) { + transforms.add(node.getTransform()); + } + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + } + + @Override + public void visitTransform(TransformTreeNode node) { + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + } + + public List> getCompositeTransforms() { + return transforms; + } + } + + @Test + public void testApplyIsScopedToExactClass() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowPipeline p = DataflowPipeline.create(options); + + Create.TimestampedValues transform = + Create.timestamped(Arrays.asList(TimestampedValue.of("TestString", Instant.now()))); + p.apply(transform); + + CompositeTransformRecorder recorder = new CompositeTransformRecorder(); + p.traverseTopologically(recorder); + + assertThat("Expected to have seen CreateTimestamped composite transform.", + recorder.getCompositeTransforms(), + Matchers.>contains(transform)); + } + + @Test + public void testToString() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + assertEquals("DataflowPipelineRunner#TestJobName", + DataflowPipelineRunner.fromOptions(options).toString()); + } + + private static PipelineOptions makeOptions(boolean streaming) { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setStreaming(streaming); + options.setJobName("TestJobName"); + options.setProject("test-project"); + options.setTempLocation("gs://test/temp/location"); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + return options; + } + + private void testUnsupportedSource(PTransform source, String name, boolean streaming) + throws Exception { + String mode = streaming ? "streaming" : "batch"; + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage( + "The DataflowPipelineRunner in " + mode + " mode does not support " + name); + + Pipeline p = Pipeline.create(makeOptions(streaming)); + p.apply(source); + p.run(); + } + + @Test + public void testBoundedSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource( + AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true); + } + + @Test + public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource( + BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true); + } + + @Test + public void testAvroIOSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource( + AvroIO.Read.from("foo"), "AvroIO.Read", true); + } + + @Test + public void testTextIOSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true); + } + + @Test + public void testReadBoundedSourceUnsupportedInStreaming() throws Exception { + testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true); + } + + @Test + public void testReadUnboundedUnsupportedInBatch() throws Exception { + testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false); + } + + private void testUnsupportedSink( + PTransform, PDone> sink, String name, boolean streaming) + throws Exception { + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage( + "The DataflowPipelineRunner in streaming mode does not support " + name); + + Pipeline p = Pipeline.create(makeOptions(streaming)); + p.apply(Create.of("foo")).apply(sink); + p.run(); + } + + @Test + public void testAvroIOSinkUnsupportedInStreaming() throws Exception { + testUnsupportedSink(AvroIO.Write.to("foo").withSchema(String.class), "AvroIO.Write", true); + } + + @Test + public void testTextIOSinkUnsupportedInStreaming() throws Exception { + testUnsupportedSink(TextIO.Write.to("foo"), "TextIO.Write", true); + } + + @Test + public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception { + DoFnTester>> doFnTester = + DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn()); + + // The order of the output elements is important relative to processing order + assertThat(doFnTester.processBatch(ImmutableList.of("a", "b", "c")), contains( + IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 0L), valueInGlobalWindow("a")), + IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 1L), valueInGlobalWindow("b")), + IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE, 2L), valueInGlobalWindow("c")))); + } + + @Test + public void testBatchViewAsListToIsmRecordForNonGlobalWindow() throws Exception { + DoFnTester>>>, + IsmRecord>> doFnTester = + DoFnTester.of( + new BatchViewAsList.ToIsmRecordForNonGlobalWindowDoFn( + IntervalWindow.getCoder())); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable>>>> inputElements = + ImmutableList.of( + KV.of(1, (Iterable>>) ImmutableList.of( + KV.of( + windowA, WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowA, WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), + KV.of( + windowB, WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), + KV.of( + windowB, WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)) + )), + KV.of(2, (Iterable>>) ImmutableList.of( + KV.of( + windowC, WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)) + ))); + + // The order of the output elements is important relative to processing order + assertThat(doFnTester.processBatch(inputElements), contains( + IsmRecord.of(ImmutableList.of(windowA, 0L), + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of(ImmutableList.of(windowA, 1L), + WindowedValue.of(111L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of(ImmutableList.of(windowA, 2L), + WindowedValue.of(112L, new Instant(4), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of(ImmutableList.of(windowB, 0L), + WindowedValue.of(120L, new Instant(12), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of(ImmutableList.of(windowB, 1L), + WindowedValue.of(121L, new Instant(14), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of(ImmutableList.of(windowC, 0L), + WindowedValue.of(210L, new Instant(25), windowC, PaneInfo.NO_FIRING)))); + } + + @Test + public void testToIsmRecordForMapLikeDoFn() throws Exception { + TupleTag>> outputForSizeTag = new TupleTag<>(); + TupleTag>> outputForEntrySetTag = new TupleTag<>(); + + Coder keyCoder = VarLongCoder.of(); + Coder windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder> ismCoder = IsmRecordCoder.of( + 1, + 2, + ImmutableList.>of( + MetadataKeyCoder.of(keyCoder), + IntervalWindow.getCoder(), + BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester, WindowedValue>>>, + IsmRecord>> doFnTester = + DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn( + outputForSizeTag, + outputForEntrySetTag, + windowCoder, + keyCoder, + ismCoder, + false /* unique keys */)); + doFnTester.setSideOutputTags(TupleTagList.of( + ImmutableList.>of(outputForSizeTag, outputForEntrySetTag))); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable, WindowedValue>>>> inputElements = + ImmutableList.of( + KV.of(1, (Iterable, WindowedValue>>) ImmutableList.of( + KV.of(KV.of(1L, windowA), + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + // same window same key as to previous + KV.of(KV.of(1L, windowA), + WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), + // same window different key as to previous + KV.of(KV.of(2L, windowA), + WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + // different window same key as to previous + KV.of(KV.of(2L, windowB), + WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), + // different window and different key as to previous + KV.of(KV.of(3L, windowB), + WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)))), + KV.of(2, (Iterable, WindowedValue>>) ImmutableList.of( + // different shard + KV.of(KV.of(4L, windowC), + WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + assertThat(doFnTester.processBatch(inputElements), contains( + IsmRecord.of( + ImmutableList.of(1L, windowA, 0L), + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(1L, windowA, 1L), + WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(2L, windowA, 0L), + WindowedValue.of(120L, new Instant(3), windowA, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(2L, windowB, 0L), + WindowedValue.of(210L, new Instant(11), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(3L, windowB, 0L), + WindowedValue.of(220L, new Instant(12), windowB, PaneInfo.NO_FIRING)), + IsmRecord.of( + ImmutableList.of(4L, windowC, 0L), + WindowedValue.of(330L, new Instant(21), windowC, PaneInfo.NO_FIRING)))); + + // Verify the number of unique keys per window. + assertThat(doFnTester.takeSideOutputElements(outputForSizeTag), contains( + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), + KV.of(windowA, 2L)), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + KV.of(windowB, 2L)), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), + KV.of(windowC, 1L)) + )); + + // Verify the output for the unique keys. + assertThat(doFnTester.takeSideOutputElements(outputForEntrySetTag), contains( + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), + KV.of(windowA, 1L)), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), + KV.of(windowA, 2L)), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + KV.of(windowB, 2L)), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + KV.of(windowB, 3L)), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowC)), + KV.of(windowC, 4L)) + )); + } + + @Test + public void testToIsmRecordForMapLikeDoFnWithoutUniqueKeysThrowsException() throws Exception { + TupleTag>> outputForSizeTag = new TupleTag<>(); + TupleTag>> outputForEntrySetTag = new TupleTag<>(); + + Coder keyCoder = VarLongCoder.of(); + Coder windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder> ismCoder = IsmRecordCoder.of( + 1, + 2, + ImmutableList.>of( + MetadataKeyCoder.of(keyCoder), + IntervalWindow.getCoder(), + BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester, WindowedValue>>>, + IsmRecord>> doFnTester = + DoFnTester.of(new BatchViewAsMultimap.ToIsmRecordForMapLikeDoFn( + outputForSizeTag, + outputForEntrySetTag, + windowCoder, + keyCoder, + ismCoder, + true /* unique keys */)); + doFnTester.setSideOutputTags(TupleTagList.of( + ImmutableList.>of(outputForSizeTag, outputForEntrySetTag))); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + + Iterable, WindowedValue>>>> inputElements = + ImmutableList.of( + KV.of(1, (Iterable, WindowedValue>>) ImmutableList.of( + KV.of(KV.of(1L, windowA), + WindowedValue.of(110L, new Instant(1), windowA, PaneInfo.NO_FIRING)), + // same window same key as to previous + KV.of(KV.of(1L, windowA), + WindowedValue.of(111L, new Instant(2), windowA, PaneInfo.NO_FIRING))))); + + try { + doFnTester.processBatch(inputElements); + fail("Expected UserCodeException"); + } catch (UserCodeException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + IllegalStateException rootCause = (IllegalStateException) e.getCause(); + assertThat(rootCause.getMessage(), containsString("Unique keys are expected but found key")); + } + } + + @Test + public void testToIsmMetadataRecordForSizeDoFn() throws Exception { + TupleTag>> outputForSizeTag = new TupleTag<>(); + TupleTag>> outputForEntrySetTag = new TupleTag<>(); + + Coder keyCoder = VarLongCoder.of(); + Coder windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder> ismCoder = IsmRecordCoder.of( + 1, + 2, + ImmutableList.>of( + MetadataKeyCoder.of(keyCoder), + IntervalWindow.getCoder(), + BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester>>, + IsmRecord>> doFnTester = DoFnTester.of( + new BatchViewAsMultimap.ToIsmMetadataRecordForSizeDoFn( + windowCoder)); + doFnTester.setSideOutputTags(TupleTagList.of( + ImmutableList.>of(outputForSizeTag, outputForEntrySetTag))); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable>>> inputElements = + ImmutableList.of( + KV.of(1, + (Iterable>) ImmutableList.of( + KV.of(windowA, 2L), + KV.of(windowA, 3L), + KV.of(windowB, 7L))), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + (Iterable>) ImmutableList.of( + KV.of(windowC, 9L)))); + + // The order of the output elements is important relative to processing order + assertThat(doFnTester.processBatch(inputElements), contains( + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 5L)), + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 7L)), + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 0L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 9L)) + )); + } + + @Test + public void testToIsmMetadataRecordForKeyDoFn() throws Exception { + TupleTag>> outputForSizeTag = new TupleTag<>(); + TupleTag>> outputForEntrySetTag = new TupleTag<>(); + + Coder keyCoder = VarLongCoder.of(); + Coder windowCoder = IntervalWindow.getCoder(); + + IsmRecordCoder> ismCoder = IsmRecordCoder.of( + 1, + 2, + ImmutableList.>of( + MetadataKeyCoder.of(keyCoder), + IntervalWindow.getCoder(), + BigEndianLongCoder.of()), + FullWindowedValueCoder.of(VarLongCoder.of(), windowCoder)); + + DoFnTester>>, + IsmRecord>> doFnTester = DoFnTester.of( + new BatchViewAsMultimap.ToIsmMetadataRecordForKeyDoFn( + keyCoder, windowCoder)); + doFnTester.setSideOutputTags(TupleTagList.of( + ImmutableList.>of(outputForSizeTag, outputForEntrySetTag))); + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable>>> inputElements = + ImmutableList.of( + KV.of(1, + (Iterable>) ImmutableList.of( + KV.of(windowA, 2L), + // same window as previous + KV.of(windowA, 3L), + // different window as previous + KV.of(windowB, 3L))), + KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowB)), + (Iterable>) ImmutableList.of( + KV.of(windowC, 3L)))); + + // The order of the output elements is important relative to processing order + assertThat(doFnTester.processBatch(inputElements), contains( + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 1L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 2L)), + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowA, 2L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowB, 1L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)), + IsmRecord.>meta( + ImmutableList.of(IsmFormat.getMetadataKey(), windowC, 1L), + CoderUtils.encodeToByteArray(VarLongCoder.of(), 3L)) + )); + } + + @Test + public void testToMapDoFn() throws Exception { + Coder windowCoder = IntervalWindow.getCoder(); + + DoFnTester>>>>, + IsmRecord, + Long>>>> doFnTester = + DoFnTester.of(new BatchViewAsMap.ToMapDoFn(windowCoder)); + + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable>>>>> inputElements = + ImmutableList.of( + KV.of(1, + (Iterable>>>) ImmutableList.of( + KV.of(windowA, WindowedValue.of( + KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), + KV.of(windowA, WindowedValue.of( + KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), + KV.of(windowB, WindowedValue.of( + KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), + KV.of(windowB, WindowedValue.of( + KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))), + KV.of(2, + (Iterable>>>) ImmutableList.of( + KV.of(windowC, WindowedValue.of( + KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + List, + Long>>>> output = + doFnTester.processBatch(inputElements); + assertEquals(3, output.size()); + Map outputMap; + + outputMap = output.get(0).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertEquals(ImmutableMap.of(1L, 11L, 2L, 21L), outputMap); + + outputMap = output.get(1).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertEquals(ImmutableMap.of(2L, 21L, 3L, 31L), outputMap); + + outputMap = output.get(2).getValue().getValue(); + assertEquals(1, outputMap.size()); + assertEquals(ImmutableMap.of(4L, 41L), outputMap); + } + + @Test + public void testToMultimapDoFn() throws Exception { + Coder windowCoder = IntervalWindow.getCoder(); + + DoFnTester>>>>, + IsmRecord>, + Iterable>>>> doFnTester = + DoFnTester.of( + new BatchViewAsMultimap.ToMultimapDoFn(windowCoder)); + + + IntervalWindow windowA = new IntervalWindow(new Instant(0), new Instant(10)); + IntervalWindow windowB = new IntervalWindow(new Instant(10), new Instant(20)); + IntervalWindow windowC = new IntervalWindow(new Instant(20), new Instant(30)); + + Iterable>>>>> inputElements = + ImmutableList.of( + KV.of(1, + (Iterable>>>) ImmutableList.of( + KV.of(windowA, WindowedValue.of( + KV.of(1L, 11L), new Instant(3), windowA, PaneInfo.NO_FIRING)), + KV.of(windowA, WindowedValue.of( + KV.of(1L, 12L), new Instant(5), windowA, PaneInfo.NO_FIRING)), + KV.of(windowA, WindowedValue.of( + KV.of(2L, 21L), new Instant(7), windowA, PaneInfo.NO_FIRING)), + KV.of(windowB, WindowedValue.of( + KV.of(2L, 21L), new Instant(13), windowB, PaneInfo.NO_FIRING)), + KV.of(windowB, WindowedValue.of( + KV.of(3L, 31L), new Instant(15), windowB, PaneInfo.NO_FIRING)))), + KV.of(2, + (Iterable>>>) ImmutableList.of( + KV.of(windowC, WindowedValue.of( + KV.of(4L, 41L), new Instant(25), windowC, PaneInfo.NO_FIRING))))); + + // The order of the output elements is important relative to processing order + List>, + Iterable>>>> output = + doFnTester.processBatch(inputElements); + assertEquals(3, output.size()); + Map> outputMap; + + outputMap = output.get(0).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertThat(outputMap.get(1L), containsInAnyOrder(11L, 12L)); + assertThat(outputMap.get(2L), containsInAnyOrder(21L)); + + outputMap = output.get(1).getValue().getValue(); + assertEquals(2, outputMap.size()); + assertThat(outputMap.get(2L), containsInAnyOrder(21L)); + assertThat(outputMap.get(3L), containsInAnyOrder(31L)); + + outputMap = output.get(2).getValue().getValue(); + assertEquals(1, outputMap.size()); + assertThat(outputMap.get(4L), containsInAnyOrder(41L)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java new file mode 100644 index 0000000..ebf0c9f --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.TestCredential; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link DataflowPipeline}. */ +@RunWith(JUnit4.class) +public class DataflowPipelineTest { + @Test + public void testToString() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setJobName("TestJobName"); + options.setProject("project-id"); + options.setTempLocation("gs://test/temp/location"); + options.setGcpCredential(new TestCredential()); + options.setPathValidatorClass(NoopPathValidator.class); + assertEquals("DataflowPipeline#TestJobName", + DataflowPipeline.create(options).toString()); + } +}