Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 22999200D56 for ; Tue, 28 Nov 2017 02:59:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 20E62160C14; Tue, 28 Nov 2017 01:59:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 408E4160C13 for ; Tue, 28 Nov 2017 02:59:48 +0100 (CET) Received: (qmail 26422 invoked by uid 500); 28 Nov 2017 01:59:47 -0000 Mailing-List: contact dev-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list dev@beam.apache.org Received: (qmail 26412 invoked by uid 99); 28 Nov 2017 01:59:47 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Nov 2017 01:59:47 +0000 From: GitBox To: dev@beam.apache.org Subject: [GitHub] asfgit closed pull request #4074: [BEAM-3130] View.asMap() causes a ClassCastException in Apex runner Message-ID: <151183438675.11217.10330025049566485714.gitbox@gitbox.apache.org> archived-at: Tue, 28 Nov 2017 01:59:49 -0000 asfgit closed pull request #4074: [BEAM-3130] View.asMap() causes a ClassCastException in Apex runner URL: https://github.com/apache/beam/pull/4074 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 57d259301b1..0483d761c26 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -100,7 +100,7 @@ public static ApexRunner fromOptions(PipelineOptions options) { } @SuppressWarnings({"rawtypes"}) - private List getOverrides() { + protected List getOverrides() { return ImmutableList.builder() .add( PTransformOverride.of( @@ -110,6 +110,18 @@ public static ApexRunner fromOptions(PipelineOptions options) { PTransformOverride.of( PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class), new StreamingViewAsIterable.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.createViewWithViewFn(PCollectionViews.ListViewFn.class), + new StreamingViewAsIterable.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.createViewWithViewFn(PCollectionViews.MapViewFn.class), + new StreamingViewAsIterable.Factory())) + .add( + PTransformOverride.of( + PTransformMatchers.createViewWithViewFn(PCollectionViews.MultimapViewFn.class), + new StreamingViewAsIterable.Factory())) .add( PTransformOverride.of( PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class), diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java index f6ce1d075b2..196f73e4cb9 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/CoderAdapterStreamCodec.java @@ -19,6 +19,7 @@ import com.datatorrent.api.StreamCodec; import com.datatorrent.netlet.util.Slice; +import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -37,6 +38,11 @@ public CoderAdapterStreamCodec(Coder coder) { this.coder = coder; } + @VisibleForTesting + public Coder getCoder() { + return this.coder; + } + @Override public Object fromByteArray(Slice fragment) { ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset, diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/SideInputTranslationTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/SideInputTranslationTest.java new file mode 100644 index 00000000000..8e050804f3c --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/SideInputTranslationTest.java @@ -0,0 +1,155 @@ +package org.apache.beam.runners.apex.translation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.OperatorMeta; +import com.datatorrent.stram.engine.PortContext; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.TestApexRunner; +import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.junit.Test; + +/** + * Test that view overrides are applied by checking the corresponding + * side input coders. Unlike runner validation these don't run the pipeline, + * they only check translation. + */ +public class SideInputTranslationTest implements java.io.Serializable { + + //@Test + public void testMapAsEntrySetSideInput() { + ApexPipelineOptions options = + PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setApplicationName("GroupByKey"); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + + + final PCollectionView> view = + pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3))) + .apply(View.asMap()); + + org.apache.beam.sdk.values.PCollection> output = + pipeline.apply("CreateMainInput", Create.of(2 /* size */)) + .apply( + "OutputSideInputs", + ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { + assertEquals((int) c.element(), c.sideInput(view).size()); + assertEquals((int) c.element(), c.sideInput(view).entrySet().size()); + for (Entry entry : c.sideInput(view).entrySet()) { + c.output(KV.of(entry.getKey(), entry.getValue())); + } + } + }).withSideInputs(view)); + + PAssert.that(output).containsInAnyOrder( + KV.of("a", 1), KV.of("b", 3)); + + pipeline.run(); + } + + private final transient ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + + @Test + public void testListSideInputTranslation() throws Exception { + + Pipeline p = Pipeline.create(); + final PCollectionView> view = + p.apply("CreateSideInput", Create.of(11, 13, 17, 23)).apply(View.asList()); + ListCoder expectedCoder = ListCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of())); + assertSideInputTranslation(view, expectedCoder); + + } + + @Test + public void testMapSideInputTranslation() throws Exception { + + Pipeline p = Pipeline.create(); + final PCollectionView> view = + p.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3))) + .apply(View.asMap()); + ListCoder expectedCoder = ListCoder.of(KvCoder.of(VoidCoder.of(), + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + assertSideInputTranslation(view, expectedCoder); + } + + @Test + public void testMultimapSideInputTranslation() throws Exception { + + Pipeline p = Pipeline.create(); + final PCollectionView>> view = + p.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 2), KV.of("b", 3))) + .apply(View.asMultimap()); + ListCoder expectedCoder = ListCoder.of(KvCoder.of(VoidCoder.of(), + KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + assertSideInputTranslation(view, expectedCoder); + } + + private void assertSideInputTranslation(PCollectionView view, Coder expectedSideInputCoder) + throws Exception { + + Pipeline p = view.getPipeline(); + p.apply("CreateMainInput", Create.of(1)) + .apply("OutputSideInputs", + ParDo.of(new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { + } + }).withSideInputs(view)); + + DAG dag = TestApexRunner.translate(p, options); + + OperatorMeta om = dag.getOperatorMeta("OutputSideInputs/ParMultiDo(Anonymous)"); + assertNotNull(om); + assertEquals(2, om.getInputStreams().size()); + + String fieldName = "field=sideInput1"; + Map.Entry sideInput = null; + for (Map.Entry input : om.getInputStreams().entrySet()) { + CoderAdapterStreamCodec sc = (CoderAdapterStreamCodec) input.getKey().getAttributes() + .get(PortContext.STREAM_CODEC); + if (input.toString().contains(fieldName)) { + sideInput = input; + } + System.out.println(sc.getCoder()); + } + assertNotNull("could not find stream for: " + fieldName, sideInput); + + CoderAdapterStreamCodec sc = (CoderAdapterStreamCodec) sideInput.getKey().getAttributes() + .get(PortContext.STREAM_CODEC); + @SuppressWarnings("rawtypes") + StructuredCoder coder = (StructuredCoder) sc.getCoder(); + assertTrue(coder.getComponents().get(0) instanceof FullWindowedValueCoder); + @SuppressWarnings("rawtypes") + FullWindowedValueCoder fwvc = (FullWindowedValueCoder) coder.getComponents().get(0); + assertEquals(expectedSideInputCoder, fwvc.getValueCoder()); + + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services