Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 18BA7200CC6 for ; Tue, 18 Jul 2017 22:05:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 172DE167740; Tue, 18 Jul 2017 20:05:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B5072167725 for ; Tue, 18 Jul 2017 22:05:36 +0200 (CEST) Received: (qmail 70697 invoked by uid 500); 18 Jul 2017 20:05:34 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 70688 invoked by uid 99); 18 Jul 2017 20:05:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Jul 2017 20:05:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 64925E0A38; Tue, 18 Jul 2017 20:05:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Tue, 18 Jul 2017 20:05:34 -0000 Message-Id: <655af944fd6e418a944debb4f85c09a8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] beam git commit: Fix split package in SDK harness archived-at: Tue, 18 Jul 2017 20:05:38 -0000 Repository: beam Updated Branches: refs/heads/master 5a0b74c9b -> 2c2d8a35f http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java deleted file mode 100644 index 64d9ea6..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.ServiceLoader; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; -import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.hamcrest.collection.IsMapContaining; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Matchers; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** Tests for {@link BeamFnDataWriteRunner}. */ -@RunWith(JUnit4.class) -public class BeamFnDataWriteRunnerTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder() - .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build(); - private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(PORT_SPEC)).build(); - private static final String CODER_ID = "string-coder-id"; - private static final Coder> CODER = - WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final RunnerApi.Coder CODER_SPEC; - private static final String URN = "urn:org.apache.beam:sink:runner:0.1"; - - static { - try { - CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec( - RunnerApi.SdkFunctionSpec.newBuilder().setSpec( - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))) - .build())) - .build()) - .build()) - .build(); - } catch (IOException e) { - throw new ExceptionInInitializerError(e); - } - } - private static final BeamFnApi.Target OUTPUT_TARGET = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference("1") - .setName("out") - .build(); - - @Mock private BeamFnDataClient mockBeamFnDataClient; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - - @Test - public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception { - String bundleId = "57L"; - String inputId = "100L"; - - Multimap>> consumers = HashMultimap.create(); - List startFunctions = new ArrayList<>(); - List finishFunctions = new ArrayList<>(); - - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn("urn:org.apache.beam:sink:runner:0.1") - .setParameter(Any.pack(PORT_SPEC)) - .build(); - - RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() - .setSpec(functionSpec) - .putInputs(inputId, "inputPC") - .build(); - - new BeamFnDataWriteRunner.Factory().createRunnerForPTransform( - PipelineOptionsFactory.create(), - mockBeamFnDataClient, - "ptransformId", - pTransform, - Suppliers.ofInstance(bundleId)::get, - ImmutableMap.of("inputPC", - RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()), - ImmutableMap.of(CODER_ID, CODER_SPEC), - consumers, - startFunctions::add, - finishFunctions::add); - - verifyZeroInteractions(mockBeamFnDataClient); - - List> outputValues = new ArrayList<>(); - AtomicBoolean wasCloseCalled = new AtomicBoolean(); - CloseableThrowingConsumer> outputConsumer = - new CloseableThrowingConsumer>(){ - @Override - public void close() throws Exception { - wasCloseCalled.set(true); - } - - @Override - public void accept(WindowedValue t) throws Exception { - outputValues.add(t); - } - }; - - when(mockBeamFnDataClient.forOutboundConsumer( - any(), - any(), - Matchers.>>any())).thenReturn(outputConsumer); - Iterables.getOnlyElement(startFunctions).run(); - verify(mockBeamFnDataClient).forOutboundConsumer( - eq(PORT_SPEC.getApiServiceDescriptor()), - eq(KV.of(bundleId, BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference("ptransformId") - .setName(inputId) - .build())), - eq(CODER)); - - assertThat(consumers.keySet(), containsInAnyOrder("inputPC")); - Iterables.getOnlyElement(consumers.get("inputPC")).accept(valueInGlobalWindow("TestValue")); - assertThat(outputValues, contains(valueInGlobalWindow("TestValue"))); - outputValues.clear(); - - assertFalse(wasCloseCalled.get()); - Iterables.getOnlyElement(finishFunctions).run(); - assertTrue(wasCloseCalled.get()); - - verifyNoMoreInteractions(mockBeamFnDataClient); - } - - @Test - public void testReuseForMultipleBundles() throws Exception { - RecordingConsumer> valuesA = new RecordingConsumer<>(); - RecordingConsumer> valuesB = new RecordingConsumer<>(); - when(mockBeamFnDataClient.forOutboundConsumer( - any(), - any(), - Matchers.>>any())).thenReturn(valuesA).thenReturn(valuesB); - AtomicReference bundleId = new AtomicReference<>("0"); - BeamFnDataWriteRunner writeRunner = new BeamFnDataWriteRunner<>( - FUNCTION_SPEC, - bundleId::get, - OUTPUT_TARGET, - CODER_SPEC, - mockBeamFnDataClient); - - // Process for bundle id 0 - writeRunner.registerForOutput(); - - verify(mockBeamFnDataClient).forOutboundConsumer( - eq(PORT_SPEC.getApiServiceDescriptor()), - eq(KV.of(bundleId.get(), OUTPUT_TARGET)), - eq(CODER)); - - writeRunner.consume(valueInGlobalWindow("ABC")); - writeRunner.consume(valueInGlobalWindow("DEF")); - writeRunner.close(); - - assertTrue(valuesA.closed); - assertThat(valuesA, contains(valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"))); - - // Process for bundle id 1 - bundleId.set("1"); - valuesA.clear(); - valuesB.clear(); - writeRunner.registerForOutput(); - - verify(mockBeamFnDataClient).forOutboundConsumer( - eq(PORT_SPEC.getApiServiceDescriptor()), - eq(KV.of(bundleId.get(), OUTPUT_TARGET)), - eq(CODER)); - - writeRunner.consume(valueInGlobalWindow("GHI")); - writeRunner.consume(valueInGlobalWindow("JKL")); - writeRunner.close(); - - assertTrue(valuesB.closed); - assertThat(valuesB, contains(valueInGlobalWindow("GHI"), valueInGlobalWindow("JKL"))); - verifyNoMoreInteractions(mockBeamFnDataClient); - } - - private static class RecordingConsumer extends ArrayList - implements CloseableThrowingConsumer { - private boolean closed; - @Override - public void close() throws Exception { - closed = true; - } - - @Override - public void accept(T t) throws Exception { - if (closed) { - throw new IllegalStateException("Consumer is closed but attempting to consume " + t); - } - add(t); - } - } - - @Test - public void testRegistration() { - for (Registrar registrar : - ServiceLoader.load(Registrar.class)) { - if (registrar instanceof BeamFnDataWriteRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN)); - return; - } - } - fail("Expected registrar not found."); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java deleted file mode 100644 index 6c9a4cb..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.collection.IsEmptyCollection.empty; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.ServiceLoader; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.hamcrest.Matchers; -import org.hamcrest.collection.IsMapContaining; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link BoundedSourceRunner}. */ -@RunWith(JUnit4.class) -public class BoundedSourceRunnerTest { - - public static final String URN = "urn:org.apache.beam:source:java:0.1"; - - @Test - public void testRunReadLoopWithMultipleSources() throws Exception { - List> out1Values = new ArrayList<>(); - List> out2Values = new ArrayList<>(); - Collection>> consumers = - ImmutableList.of(out1Values::add, out2Values::add); - - BoundedSourceRunner, Long> runner = new BoundedSourceRunner<>( - PipelineOptionsFactory.create(), - RunnerApi.FunctionSpec.getDefaultInstance(), - consumers); - - runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2))); - runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1))); - - assertThat(out1Values, - contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); - assertThat(out2Values, - contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); - } - - @Test - public void testRunReadLoopWithEmptySource() throws Exception { - List> outValues = new ArrayList<>(); - Collection>> consumers = - ImmutableList.of(outValues::add); - - BoundedSourceRunner, Long> runner = new BoundedSourceRunner<>( - PipelineOptionsFactory.create(), - RunnerApi.FunctionSpec.getDefaultInstance(), - consumers); - - runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0))); - - assertThat(outValues, empty()); - } - - @Test - public void testStart() throws Exception { - List> outValues = new ArrayList<>(); - Collection>> consumers = - ImmutableList.of(outValues::add); - - ByteString encodedSource = - ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))); - - BoundedSourceRunner, Long> runner = new BoundedSourceRunner<>( - PipelineOptionsFactory.create(), - RunnerApi.FunctionSpec.newBuilder().setParameter( - Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(), - consumers); - - runner.start(); - - assertThat(outValues, - contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(2L))); - } - - @Test - public void testCreatingAndProcessingSourceFromFactory() throws Exception { - List> outputValues = new ArrayList<>(); - - Multimap>> consumers = HashMultimap.create(); - consumers.put("outputPC", - (ThrowingConsumer) (ThrowingConsumer>) outputValues::add); - List startFunctions = new ArrayList<>(); - List finishFunctions = new ArrayList<>(); - - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn("urn:org.apache.beam:source:java:0.1") - .setParameter(Any.pack(BytesValue.newBuilder() - .setValue(ByteString.copyFrom( - SerializableUtils.serializeToByteArray(CountingSource.upTo(3)))) - .build())) - .build(); - - RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() - .setSpec(functionSpec) - .putInputs("input", "inputPC") - .putOutputs("output", "outputPC") - .build(); - - new BoundedSourceRunner.Factory<>().createRunnerForPTransform( - PipelineOptionsFactory.create(), - null /* beamFnDataClient */, - "pTransformId", - pTransform, - Suppliers.ofInstance("57L")::get, - ImmutableMap.of(), - ImmutableMap.of(), - consumers, - startFunctions::add, - finishFunctions::add); - - // This is testing a deprecated way of running sources and should be removed - // once all source definitions are instead propagated along the input edge. - Iterables.getOnlyElement(startFunctions).run(); - assertThat(outputValues, contains( - valueInGlobalWindow(0L), - valueInGlobalWindow(1L), - valueInGlobalWindow(2L))); - outputValues.clear(); - - // Check that when passing a source along as an input, the source is processed. - assertThat(consumers.keySet(), containsInAnyOrder("inputPC", "outputPC")); - Iterables.getOnlyElement(consumers.get("inputPC")).accept( - valueInGlobalWindow(CountingSource.upTo(2))); - assertThat(outputValues, contains( - valueInGlobalWindow(0L), - valueInGlobalWindow(1L))); - - assertThat(finishFunctions, Matchers.empty()); - } - - @Test - public void testRegistration() { - for (Registrar registrar : - ServiceLoader.load(Registrar.class)) { - if (registrar instanceof BoundedSourceRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN)); - return; - } - } - fail("Expected registrar not found."); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java deleted file mode 100644 index c4df77a..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow; -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import com.google.protobuf.Message; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; -import org.apache.beam.runners.core.construction.ParDoTranslation; -import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.runners.dataflow.util.DoFnInfo; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.hamcrest.collection.IsMapContaining; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link FnApiDoFnRunner}. */ -@RunWith(JUnit4.class) -public class FnApiDoFnRunnerTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final Coder> STRING_CODER = - WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final String STRING_CODER_SPEC_ID = "999L"; - private static final RunnerApi.Coder STRING_CODER_SPEC; - - static { - try { - STRING_CODER_SPEC = RunnerApi.Coder.newBuilder() - .setSpec(RunnerApi.SdkFunctionSpec.newBuilder() - .setSpec(RunnerApi.FunctionSpec.newBuilder() - .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom( - OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER)))) - .build()))) - .build()) - .build(); - } catch (IOException e) { - throw new ExceptionInInitializerError(e); - } - } - - private static class TestDoFn extends DoFn { - private static final TupleTag mainOutput = new TupleTag<>("mainOutput"); - private static final TupleTag additionalOutput = new TupleTag<>("output"); - - private BoundedWindow window; - - @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) { - context.output("MainOutput" + context.element()); - context.output(additionalOutput, "AdditionalOutput" + context.element()); - this.window = window; - } - - @FinishBundle - public void finishBundle(FinishBundleContext context) { - if (window != null) { - context.output("FinishBundle", window.maxTimestamp(), window); - window = null; - } - } - } - - /** - * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs - * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs - * are directed to the correct consumers. - */ - @Test - public void testCreatingAndProcessingDoFn() throws Exception { - Map fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); - String pTransformId = "pTransformId"; - String mainOutputId = "101"; - String additionalOutputId = "102"; - - DoFnInfo doFnInfo = DoFnInfo.forFn( - new TestDoFn(), - WindowingStrategy.globalDefault(), - ImmutableList.of(), - StringUtf8Coder.of(), - Long.parseLong(mainOutputId), - ImmutableMap.of( - Long.parseLong(mainOutputId), TestDoFn.mainOutput, - Long.parseLong(additionalOutputId), TestDoFn.additionalOutput)); - RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN) - .setParameter(Any.pack(BytesValue.newBuilder() - .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) - .build())) - .build(); - RunnerApi.PTransform pTransform = RunnerApi.PTransform.newBuilder() - .setSpec(functionSpec) - .putInputs("inputA", "inputATarget") - .putInputs("inputB", "inputBTarget") - .putOutputs(mainOutputId, "mainOutputTarget") - .putOutputs(additionalOutputId, "additionalOutputTarget") - .build(); - - List> mainOutputValues = new ArrayList<>(); - List> additionalOutputValues = new ArrayList<>(); - Multimap>> consumers = HashMultimap.create(); - consumers.put("mainOutputTarget", - (ThrowingConsumer) (ThrowingConsumer>) mainOutputValues::add); - consumers.put("additionalOutputTarget", - (ThrowingConsumer) (ThrowingConsumer>) additionalOutputValues::add); - List startFunctions = new ArrayList<>(); - List finishFunctions = new ArrayList<>(); - - new FnApiDoFnRunner.Factory<>().createRunnerForPTransform( - PipelineOptionsFactory.create(), - null /* beamFnDataClient */, - pTransformId, - pTransform, - Suppliers.ofInstance("57L")::get, - ImmutableMap.of(), - ImmutableMap.of(), - consumers, - startFunctions::add, - finishFunctions::add); - - Iterables.getOnlyElement(startFunctions).run(); - mainOutputValues.clear(); - - assertThat(consumers.keySet(), containsInAnyOrder( - "inputATarget", "inputBTarget", "mainOutputTarget", "additionalOutputTarget")); - - Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A1")); - Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("A2")); - Iterables.getOnlyElement(consumers.get("inputATarget")).accept(valueInGlobalWindow("B")); - assertThat(mainOutputValues, contains( - valueInGlobalWindow("MainOutputA1"), - valueInGlobalWindow("MainOutputA2"), - valueInGlobalWindow("MainOutputB"))); - assertThat(additionalOutputValues, contains( - valueInGlobalWindow("AdditionalOutputA1"), - valueInGlobalWindow("AdditionalOutputA2"), - valueInGlobalWindow("AdditionalOutputB"))); - mainOutputValues.clear(); - additionalOutputValues.clear(); - - Iterables.getOnlyElement(finishFunctions).run(); - assertThat( - mainOutputValues, - contains( - timestampedValueInGlobalWindow("FinishBundle", GlobalWindow.INSTANCE.maxTimestamp()))); - mainOutputValues.clear(); - } - - @Test - public void testRegistration() { - for (Registrar registrar : - ServiceLoader.load(Registrar.class)) { - if (registrar instanceof FnApiDoFnRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), - IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)); - return; - } - } - fail("Expected registrar not found."); - } -}