From commits-return-100114-archive-asf-public=cust-asf.ponee.io@beam.apache.org Fri Dec 21 20:03:15 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2B692180627 for ; Fri, 21 Dec 2018 20:03:13 +0100 (CET) Received: (qmail 16280 invoked by uid 500); 21 Dec 2018 19:03:13 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 16271 invoked by uid 99); 21 Dec 2018 19:03:13 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Dec 2018 19:03:13 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A9691875E9; Fri, 21 Dec 2018 19:03:12 +0000 (UTC) Date: Fri, 21 Dec 2018 19:03:11 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154541899121.28909.3503677018769026523@gitbox.apache.org> From: mxm@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 7302aef873ecfcfc415a78ed4113a5a4995d831b X-Git-Newrev: 845de99bb3db353fc219333f736bd02bae65a5ad X-Git-Rev: 845de99bb3db353fc219333f736bd02bae65a5ad X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 845de99 [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert 845de99 is described below commit 845de99bb3db353fc219333f736bd02bae65a5ad Author: Maximilian Michels AuthorDate: Fri Dec 21 20:03:03 2018 +0100 [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert This fixes a race condition in the tests when writing results to a static map. --- .../beam/runners/flink/PortableExecutionTest.java | 94 +++++----- .../runners/flink/PortableStateExecutionTest.java | 193 ++++++++++----------- .../runners/flink/PortableTimersExecutionTest.java | 2 +- 3 files changed, 136 insertions(+), 153 deletions(-) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java index 34985d7..9542bdd 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java @@ -17,19 +17,16 @@ */ package org.apache.beam.runners.flink; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.Executors; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianLongCoder; @@ -39,12 +36,14 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,7 +60,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class PortableExecutionTest implements Serializable { - @Parameters + @Parameters(name = "streaming: {0}") public static Object[] data() { return new Object[] {true, false}; } @@ -80,9 +79,7 @@ public class PortableExecutionTest implements Serializable { flinkJobExecutor.shutdown(); } - private static ArrayList>> outputValues = new ArrayList<>(); - - @Test + @Test(timeout = 120_000) public void testExecution() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(CrashingRunner.class); @@ -92,45 +89,42 @@ public class PortableExecutionTest implements Serializable { .as(PortablePipelineOptions.class) .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); Pipeline p = Pipeline.create(options); - p.apply("impulse", Impulse.create()) - .apply( - "create", - ParDo.of( - new DoFn() { - @ProcessElement - public void process(ProcessContext ctxt) { - ctxt.output("zero"); - ctxt.output("one"); - ctxt.output("two"); - } - })) - .apply( - "len", - ParDo.of( - new DoFn() { - @ProcessElement - public void process(ProcessContext ctxt) { - ctxt.output((long) ctxt.element().length()); - } - })) - .apply("addKeys", WithKeys.of("foo")) - // Use some unknown coders - .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) - // Force the output to be materialized - .apply("gbk", GroupByKey.create()) - .apply( - "collect", - ParDo.of( - new DoFn>, Void>() { - @ProcessElement - public void process(ProcessContext ctx) { - outputValues.add(ctx.element()); - } - })); + PCollection>> result = + p.apply("impulse", Impulse.create()) + .apply( + "create", + ParDo.of( + new DoFn() { + @ProcessElement + public void process(ProcessContext ctxt) { + ctxt.output("zero"); + ctxt.output("one"); + ctxt.output("two"); + } + })) + .apply( + "len", + ParDo.of( + new DoFn() { + @ProcessElement + public void process(ProcessContext ctxt) { + ctxt.output((long) ctxt.element().length()); + } + })) + .apply("addKeys", WithKeys.of("foo")) + // Use some unknown coders + .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) + // Force the output to be materialized + .apply("gbk", GroupByKey.create()); + + PAssert.that(result).containsInAnyOrder(KV.of("foo", ImmutableList.of(4L, 3L, 3L))); + + // This is line below required to convert the PAssert's read to an impulse, which is expected + // by the GreedyPipelineFuser. + p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); - outputValues.clear(); // execute the pipeline FlinkJobInvocation jobInvocation = FlinkJobInvocation.create( @@ -140,16 +134,10 @@ public class PortableExecutionTest implements Serializable { pipelineProto, options.as(FlinkPipelineOptions.class), null, - Collections.EMPTY_LIST); + Collections.emptyList()); jobInvocation.start(); - long timeout = System.currentTimeMillis() + 60 * 1000; - while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) { + while (jobInvocation.getState() != Enum.DONE) { Thread.sleep(1000); } - assertEquals("job state", Enum.DONE, jobInvocation.getState()); - - assertEquals(1, outputValues.size()); - assertEquals("foo", outputValues.get(0).getKey()); - assertThat(outputValues.get(0).getValue(), containsInAnyOrder(4L, 3L, 3L)); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java index 05194d1..a658a1c 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java @@ -17,20 +17,15 @@ */ package org.apache.beam.runners.flink; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.Serializable; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Executors; import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.JavaReadViaImpulse; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; @@ -41,10 +36,12 @@ import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -60,7 +57,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class PortableStateExecutionTest implements Serializable { - @Parameters + @Parameters(name = "streaming: {0}") public static Object[] data() { return new Object[] {true, false}; } @@ -79,21 +76,11 @@ public class PortableStateExecutionTest implements Serializable { flinkJobExecutor.shutdown(); } - // State -> Key -> Value - private static final Map> stateValuesMap = new HashMap<>(); - - @Before - public void before() { - stateValuesMap.clear(); - stateValuesMap.put("valueState", new HashMap<>()); - stateValuesMap.put("valueState2", new HashMap<>()); - } - // Special values which clear / write out state private static final int CLEAR_STATE = -1; - private static final int WRITE_STATE_TO_MAP = -2; + private static final int WRITE_STATE = -2; - @Test + @Test(timeout = 120_000) public void testExecution() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(CrashingRunner.class); @@ -103,74 +90,93 @@ public class PortableStateExecutionTest implements Serializable { .as(PortablePipelineOptions.class) .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED); Pipeline p = Pipeline.create(options); - p.apply(Impulse.create()) - .apply( - ParDo.of( - new DoFn>() { - @ProcessElement - public void process(ProcessContext ctx) { - // Values == -1 will clear the state - ctx.output(KV.of("clearedState", 1)); - ctx.output(KV.of("clearedState", CLEAR_STATE)); - // values >= 1 will be added on top of each other - ctx.output(KV.of("bla1", 42)); - ctx.output(KV.of("bla", 23)); - ctx.output(KV.of("bla2", 64)); - ctx.output(KV.of("bla", 1)); - ctx.output(KV.of("bla", 1)); - // values == -2 will write the state to a map - ctx.output(KV.of("bla", WRITE_STATE_TO_MAP)); - ctx.output(KV.of("bla1", WRITE_STATE_TO_MAP)); - ctx.output(KV.of("bla2", WRITE_STATE_TO_MAP)); - ctx.output(KV.of("clearedState", -2)); - } - })) - .apply( - "statefulDoFn", - ParDo.of( - new DoFn, String>() { - @StateId("valueState") - private final StateSpec> valueStateSpec = - StateSpecs.value(VarIntCoder.of()); - - @StateId("valueState2") - private final StateSpec> valueStateSpec2 = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void process( - ProcessContext ctx, - @StateId("valueState") ValueState valueState, - @StateId("valueState2") ValueState valueState2) { - performStateUpdates("valueState", ctx, valueState); - performStateUpdates("valueState2", ctx, valueState2); - } - - private void performStateUpdates( - String stateId, ProcessContext ctx, ValueState valueState) { - Map stateValues = stateValuesMap.get(stateId); - Integer value = ctx.element().getValue(); - if (value == null) { - throw new IllegalStateException(); - } - switch (value) { - case CLEAR_STATE: - valueState.clear(); - break; - case WRITE_STATE_TO_MAP: - stateValues.put(ctx.element().getKey(), valueState.read()); - break; - default: - Integer currentState = valueState.read(); - if (currentState == null) { - currentState = value; - } else { - currentState += value; + PCollection> output = + p.apply(Impulse.create()) + .apply( + ParDo.of( + new DoFn>() { + @ProcessElement + public void process(ProcessContext ctx) { + // Values == -1 will clear the state + ctx.output(KV.of("clearedState", 1)); + ctx.output(KV.of("clearedState", CLEAR_STATE)); + // values >= 1 will be added on top of each other + ctx.output(KV.of("bla1", 42)); + ctx.output(KV.of("bla", 23)); + ctx.output(KV.of("bla2", 64)); + ctx.output(KV.of("bla", 1)); + ctx.output(KV.of("bla", 1)); + // values == -2 will write the current state to the output + ctx.output(KV.of("bla", WRITE_STATE)); + ctx.output(KV.of("bla1", WRITE_STATE)); + ctx.output(KV.of("bla2", WRITE_STATE)); + ctx.output(KV.of("clearedState", WRITE_STATE)); + } + })) + .apply( + "statefulDoFn", + ParDo.of( + new DoFn, KV>() { + @StateId("valueState") + private final StateSpec> valueStateSpec = + StateSpecs.value(VarIntCoder.of()); + + @StateId("valueState2") + private final StateSpec> valueStateSpec2 = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void process( + ProcessContext ctx, + @StateId("valueState") ValueState valueState, + @StateId("valueState2") ValueState valueState2) { + performStateUpdates(ctx, valueState); + performStateUpdates(ctx, valueState2); + } + + private void performStateUpdates( + ProcessContext ctx, ValueState valueState) { + Integer value = ctx.element().getValue(); + if (value == null) { + throw new IllegalStateException(); } - valueState.write(currentState); - } - } - })); + switch (value) { + case CLEAR_STATE: + valueState.clear(); + break; + case WRITE_STATE: + Integer read = valueState.read(); + ctx.output( + KV.of( + ctx.element().getKey(), + read == null ? "null" : read.toString())); + break; + default: + Integer currentState = valueState.read(); + if (currentState == null) { + currentState = value; + } else { + currentState += value; + } + valueState.write(currentState); + } + } + })); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("bla", "25"), + KV.of("bla1", "42"), + KV.of("bla2", "64"), + KV.of("clearedState", "null"), + KV.of("bla", "25"), + KV.of("bla1", "42"), + KV.of("bla2", "64"), + KV.of("clearedState", "null")); + + // This is line below required to convert the PAssert's read to an impulse, which is expected + // by the GreedyPipelineFuser. + p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride())); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p); @@ -185,20 +191,9 @@ public class PortableStateExecutionTest implements Serializable { Collections.emptyList()); jobInvocation.start(); - long timeout = System.currentTimeMillis() + 60 * 1000; - while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) { - Thread.sleep(1000); - } - assertThat(jobInvocation.getState(), is(Enum.DONE)); - Map expected = new HashMap<>(); - expected.put("bla", 25); - expected.put("bla1", 42); - expected.put("bla2", 64); - expected.put("clearedState", null); - - for (Map statesValues : stateValuesMap.values()) { - assertThat(statesValues, equalTo(expected)); + while (jobInvocation.getState() != Enum.DONE) { + Thread.sleep(1000); } } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java index 58db1ba..d9639b8 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java @@ -67,7 +67,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class PortableTimersExecutionTest implements Serializable { - @Parameters + @Parameters(name = "streaming: {0}") public static Object[] testModes() { return new Object[] {true, false}; }