Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A8467200B35 for ; Tue, 21 Jun 2016 00:16:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A6EA5160A69; Mon, 20 Jun 2016 22:16:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6B8E3160A65 for ; Tue, 21 Jun 2016 00:16:18 +0200 (CEST) Received: (qmail 12421 invoked by uid 500); 20 Jun 2016 22:16:17 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 12329 invoked by uid 99); 20 Jun 2016 22:16:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 22:16:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 165141A0528 for ; Mon, 20 Jun 2016 22:16:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.152 X-Spam-Level: X-Spam-Status: No, score=-4.152 tagged_above=-999 required=6.31 tests=[FUZZY_VPILL=0.494, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id aCN9bwlMv8NK for ; Mon, 20 Jun 2016 22:16:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 0B41360CDC for ; Mon, 20 Jun 2016 22:15:55 +0000 (UTC) Received: (qmail 8224 invoked by uid 99); 20 Jun 2016 22:15:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 22:15:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DCE87ED30E; Mon, 20 Jun 2016 22:15:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Mon, 20 Jun 2016 22:16:05 -0000 Message-Id: In-Reply-To: <483b066d27bf4a4988cffaf4189af9f6@git.apache.org> References: <483b066d27bf4a4988cffaf4189af9f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] incubator-beam git commit: Remove InProcess Prefixes archived-at: Mon, 20 Jun 2016 22:16:20 -0000 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java deleted file mode 100644 index b1cbeb1..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java +++ /dev/null @@ -1,545 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.Counter.AggregationKind; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Tests for {@link InProcessEvaluationContext}. - */ -@RunWith(JUnit4.class) -public class InProcessEvaluationContextTest { - private TestPipeline p; - private InProcessEvaluationContext context; - - private PCollection created; - private PCollection> downstream; - private PCollectionView> view; - private PCollection unbounded; - private Collection> rootTransforms; - private Map>> valueToConsumers; - - private BundleFactory bundleFactory; - - @Before - public void setup() { - DirectRunner runner = - DirectRunner.fromOptions(PipelineOptionsFactory.create()); - - p = TestPipeline.create(); - - created = p.apply(Create.of(1, 2, 3)); - downstream = created.apply(WithKeys.of("foo")); - view = created.apply(View.asIterable()); - unbounded = p.apply(CountingInput.unbounded()); - - ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); - p.traverseTopologically(cVis); - rootTransforms = cVis.getRootTransforms(); - valueToConsumers = cVis.getValueToConsumers(); - - bundleFactory = InProcessBundleFactory.create(); - - context = - InProcessEvaluationContext.create( - runner.getPipelineOptions(), - InProcessBundleFactory.create(), - rootTransforms, - valueToConsumers, - cVis.getStepNames(), - cVis.getViews()); - } - - @Test - public void writeToViewWriterThenReadReads() { - PCollectionViewWriter> viewWriter = - context.createPCollectionViewWriter( - PCollection.>createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED), - view); - BoundedWindow window = new TestBoundedWindow(new Instant(1024L)); - BoundedWindow second = new TestBoundedWindow(new Instant(899999L)); - WindowedValue firstValue = - WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue secondValue = - WindowedValue.of( - 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); - Iterable> values = ImmutableList.of(firstValue, secondValue); - viewWriter.add(values); - - SideInputReader reader = - context.createSideInputReader(ImmutableList.>of(view)); - assertThat(reader.get(view, window), containsInAnyOrder(1)); - assertThat(reader.get(view, second), containsInAnyOrder(2)); - - WindowedValue overrittenSecondValue = - WindowedValue.of( - 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); - viewWriter.add(Collections.singleton(overrittenSecondValue)); - assertThat(reader.get(view, second), containsInAnyOrder(2)); - // The cached value is served in the earlier reader - reader = context.createSideInputReader(ImmutableList.>of(view)); - assertThat(reader.get(view, second), containsInAnyOrder(4444)); - } - - @Test - public void getExecutionContextSameStepSameKeyState() { - InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), - StructuralKey.of("foo", StringUtf8Coder.of())); - - StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); - stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); - - context.handleResult(InProcessBundleFactory.create() - .createKeyedBundle(null, StructuralKey.of("foo", StringUtf8Coder.of()), created) - .commit(Instant.now()), - ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) - .withState(stepContext.commitState()) - .build()); - - InProcessExecutionContext secondFooContext = - context.getExecutionContext(created.getProducingTransformInternal(), - StructuralKey.of("foo", StringUtf8Coder.of())); - assertThat( - secondFooContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .read(), - contains(1)); - } - - - @Test - public void getExecutionContextDifferentKeysIndependentState() { - InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), - StructuralKey.of("foo", StringUtf8Coder.of())); - - StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - fooContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .add(1); - - InProcessExecutionContext barContext = - context.getExecutionContext(created.getProducingTransformInternal(), - StructuralKey.of("bar", StringUtf8Coder.of())); - assertThat(barContext, not(equalTo(fooContext))); - assertThat( - barContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .read(), - emptyIterable()); - } - - @Test - public void getExecutionContextDifferentStepsIndependentState() { - StructuralKey myKey = StructuralKey.of("foo", StringUtf8Coder.of()); - InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), myKey); - - StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - fooContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .add(1); - - InProcessExecutionContext barContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); - assertThat( - barContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .read(), - emptyIterable()); - } - - @Test - public void handleResultMergesCounters() { - CounterSet counters = context.createCounterSet(); - Counter myCounter = Counter.longs("foo", AggregationKind.SUM); - counters.addCounter(myCounter); - - myCounter.addValue(4L); - InProcessTransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()) - .withCounters(counters) - .build(); - context.handleResult(null, ImmutableList.of(), result); - assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L)); - - CounterSet againCounters = context.createCounterSet(); - Counter myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM); - againCounters.add(myLongCounterAgain); - myLongCounterAgain.addValue(8L); - - InProcessTransformResult secondResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) - .withCounters(againCounters) - .build(); - context.handleResult( - context.createRootBundle(created).commit(Instant.now()), - ImmutableList.of(), - secondResult); - assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); - } - - @Test - public void handleResultStoresState() { - StructuralKey myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of()); - InProcessExecutionContext fooContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); - - StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - CopyOnAccessInMemoryStateInternals state = - fooContext.getOrCreateStepContext("s1", "s1").stateInternals(); - BagState bag = state.state(StateNamespaces.global(), intBag); - bag.add(1); - bag.add(2); - bag.add(4); - - InProcessTransformResult stateResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) - .withState(state) - .build(); - - context.handleResult( - context.createKeyedBundle(null, myKey, created).commit(Instant.now()), - ImmutableList.of(), - stateResult); - - InProcessExecutionContext afterResultContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); - - CopyOnAccessInMemoryStateInternals afterResultState = - afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); - assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); - } - - @Test - public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception { - final CountDownLatch callLatch = new CountDownLatch(1); - Runnable callback = - new Runnable() { - @Override - public void run() { - callLatch.countDown(); - } - }; - - // Should call back after the end of the global window - context.scheduleAfterOutputWouldBeProduced( - downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); - - InProcessTransformResult result = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) - .build(); - - context.handleResult(null, ImmutableList.of(), result); - // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit - // will likely be flaky if this logic is broken - assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); - - InProcessTransformResult finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - context.handleResult(null, ImmutableList.of(), finishedResult); - context.forceRefresh(); - // Obtain the value via blocking call - assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); - } - - @Test - public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { - InProcessTransformResult finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - context.handleResult(null, ImmutableList.of(), finishedResult); - - final CountDownLatch callLatch = new CountDownLatch(1); - context.extractFiredTimers(); - Runnable callback = - new Runnable() { - @Override - public void run() { - callLatch.countDown(); - } - }; - context.scheduleAfterOutputWouldBeProduced( - downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); - assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); - } - - @Test - public void extractFiredTimersExtractsTimers() { - InProcessTransformResult holdResult = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) - .build(); - context.handleResult(null, ImmutableList.of(), holdResult); - - StructuralKey key = StructuralKey.of("foo".length(), VarIntCoder.of()); - TimerData toFire = - TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); - InProcessTransformResult timerResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) - .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) - .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) - .build(); - - // haven't added any timers, must be empty - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); - context.handleResult( - context.createKeyedBundle(null, key, created).commit(Instant.now()), - ImmutableList.of(), - timerResult); - - // timer hasn't fired - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); - - InProcessTransformResult advanceResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - // Should cause the downstream timer to fire - context.handleResult(null, ImmutableList.of(), advanceResult); - - Map, Map, FiredTimers>> fired = - context.extractFiredTimers(); - assertThat( - fired, - Matchers.>hasKey(downstream.getProducingTransformInternal())); - Map, FiredTimers> downstreamFired = - fired.get(downstream.getProducingTransformInternal()); - assertThat(downstreamFired, Matchers.hasKey(key)); - - FiredTimers firedForKey = downstreamFired.get(key); - assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); - - // Don't reextract timers - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); - } - - @Test - public void createBundleKeyedResultPropagatesKey() { - StructuralKey key = StructuralKey.of("foo", StringUtf8Coder.of()); - CommittedBundle> newBundle = - context - .createBundle( - bundleFactory.createKeyedBundle( - null, key, - created).commit(Instant.now()), - downstream).commit(Instant.now()); - assertThat(newBundle.getKey(), Matchers.>equalTo(key)); - } - - @Test - public void createKeyedBundleKeyed() { - StructuralKey key = StructuralKey.of("foo", StringUtf8Coder.of()); - CommittedBundle> keyedBundle = - context.createKeyedBundle( - bundleFactory.createRootBundle(created).commit(Instant.now()), - key, - downstream).commit(Instant.now()); - assertThat(keyedBundle.getKey(), - Matchers.>equalTo(key)); - } - - @Test - public void isDoneWithUnboundedPCollectionAndShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); - - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - context.extractFiredTimers(); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); - } - - @Test - public void isDoneWithUnboundedPCollectionAndNotShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); - - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); - } - - @Test - public void isDoneWithOnlyBoundedPCollections() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); - - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); - context.extractFiredTimers(); - assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); - } - - @Test - public void isDoneWithPartiallyDone() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); - assertThat(context.isDone(), is(false)); - - UncommittedBundle rootBundle = context.createRootBundle(created); - rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - CommittedResult handleResult = - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) - .addOutput(rootBundle) - .build()); - @SuppressWarnings("unchecked") - CommittedBundle committedBundle = - (CommittedBundle) Iterables.getOnlyElement(handleResult.getOutputs()); - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - assertThat(context.isDone(), is(false)); - - for (AppliedPTransform consumers : valueToConsumers.get(created)) { - context.handleResult( - committedBundle, - ImmutableList.of(), - StepTransformResult.withoutHold(consumers).build()); - } - context.extractFiredTimers(); - assertThat(context.isDone(), is(true)); - } - - @Test - public void isDoneWithUnboundedAndNotShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(), is(false)); - - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); - context.handleResult( - null, - ImmutableList.of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - context.handleResult( - context.createRootBundle(created).commit(Instant.now()), - ImmutableList.of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); - context.extractFiredTimers(); - assertThat(context.isDone(), is(false)); - - context.handleResult( - context.createRootBundle(created).commit(Instant.now()), - ImmutableList.of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); - context.extractFiredTimers(); - assertThat(context.isDone(), is(false)); - } - - private static class TestBoundedWindow extends BoundedWindow { - private final Instant ts; - - public TestBoundedWindow(Instant ts) { - this.ts = ts; - } - - @Override - public Instant maxTimestamp() { - return ts; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java deleted file mode 100644 index e8d4711..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows; -import org.apache.beam.sdk.util.KeyedWorkItem; -import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multiset; - -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link InProcessGroupByKeyOnlyEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class InProcessGroupByKeyOnlyEvaluatorFactoryTest { - private BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void testInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - KV firstFoo = KV.of("foo", -1); - KV secondFoo = KV.of("foo", 1); - KV thirdFoo = KV.of("foo", 3); - KV firstBar = KV.of("bar", 22); - KV secondBar = KV.of("bar", 12); - KV firstBaz = KV.of("baz", Integer.MAX_VALUE); - PCollection> values = - p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); - PCollection>> kvs = - values.apply(new ReifyTimestampsAndWindows()); - PCollection> groupedKvs = - kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly()); - - CommittedBundle>> inputBundle = - bundleFactory.createRootBundle(kvs).commit(Instant.now()); - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - - StructuralKey fooKey = StructuralKey.of("foo", StringUtf8Coder.of()); - UncommittedBundle> fooBundle = bundleFactory.createKeyedBundle( - null, fooKey, - groupedKvs); - StructuralKey barKey = StructuralKey.of("bar", StringUtf8Coder.of()); - UncommittedBundle> barBundle = bundleFactory.createKeyedBundle( - null, barKey, - groupedKvs); - StructuralKey bazKey = StructuralKey.of("baz", StringUtf8Coder.of()); - UncommittedBundle> bazBundle = bundleFactory.createKeyedBundle( - null, bazKey, - groupedKvs); - - when(evaluationContext.createKeyedBundle(inputBundle, - fooKey, - groupedKvs)).thenReturn(fooBundle); - when(evaluationContext.createKeyedBundle(inputBundle, - barKey, - groupedKvs)).thenReturn(barBundle); - when(evaluationContext.createKeyedBundle(inputBundle, - bazKey, - groupedKvs)).thenReturn(bazBundle); - - // The input to a GroupByKey is assumed to be a KvCoder - @SuppressWarnings("unchecked") - Coder keyCoder = - ((KvCoder>) kvs.getCoder()).getKeyCoder(); - TransformEvaluator>> evaluator = - new InProcessGroupByKeyOnlyEvaluatorFactory() - .forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); - evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); - - evaluator.finishBundle(); - - assertThat( - fooBundle.commit(Instant.now()).getElements(), - contains( - new KeyedWorkItemMatcher( - KeyedWorkItems.elementsWorkItem( - "foo", - ImmutableSet.of( - WindowedValue.valueInGlobalWindow(-1), - WindowedValue.valueInGlobalWindow(1), - WindowedValue.valueInGlobalWindow(3))), - keyCoder))); - assertThat( - barBundle.commit(Instant.now()).getElements(), - contains( - new KeyedWorkItemMatcher( - KeyedWorkItems.elementsWorkItem( - "bar", - ImmutableSet.of( - WindowedValue.valueInGlobalWindow(12), - WindowedValue.valueInGlobalWindow(22))), - keyCoder))); - assertThat( - bazBundle.commit(Instant.now()).getElements(), - contains( - new KeyedWorkItemMatcher( - KeyedWorkItems.elementsWorkItem( - "baz", - ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))), - keyCoder))); - } - - private KV> gwValue(KV kv) { - return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue())); - } - - private static class KeyedWorkItemMatcher - extends BaseMatcher>> { - private final KeyedWorkItem myWorkItem; - private final Coder keyCoder; - - public KeyedWorkItemMatcher(KeyedWorkItem myWorkItem, Coder keyCoder) { - this.myWorkItem = myWorkItem; - this.keyCoder = keyCoder; - } - - @Override - public boolean matches(Object item) { - if (item == null || !(item instanceof WindowedValue)) { - return false; - } - WindowedValue> that = (WindowedValue>) item; - Multiset> myValues = HashMultiset.create(); - Multiset> thatValues = HashMultiset.create(); - for (WindowedValue value : myWorkItem.elementsIterable()) { - myValues.add(value); - } - for (WindowedValue value : that.getValue().elementsIterable()) { - thatValues.add(value); - } - try { - return myValues.equals(thatValues) - && keyCoder - .structuralValue(myWorkItem.key()) - .equals(keyCoder.structuralValue(that.getValue().key())); - } catch (Exception e) { - return false; - } - } - - @Override - public void describeTo(Description description) { - description - .appendText("KeyedWorkItem containing key ") - .appendValue(myWorkItem.key()) - .appendText(" and values ") - .appendValueList("[", ", ", "]", myWorkItem.elementsIterable()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java deleted file mode 100644 index 746c0f8..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessSideInputContainerTest.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doAnswer; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Mean; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -/** - * Tests for {@link InProcessSideInputContainer}. - */ -@RunWith(JUnit4.class) -public class InProcessSideInputContainerTest { - private static final BoundedWindow FIRST_WINDOW = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(789541L); - } - - @Override - public String toString() { - return "firstWindow"; - } - }; - - private static final BoundedWindow SECOND_WINDOW = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(14564786L); - } - - @Override - public String toString() { - return "secondWindow"; - } - }; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Mock - private InProcessEvaluationContext context; - - private TestPipeline pipeline; - - private InProcessSideInputContainer container; - - private PCollectionView> mapView; - private PCollectionView singletonView; - - // Not present in container. - private PCollectionView> iterableView; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - pipeline = TestPipeline.create(); - - PCollection create = - pipeline.apply("forBaseCollection", Create.of(1, 2, 3, 4)); - - mapView = - create.apply("forKeyTypes", WithKeys.of("foo")) - .apply("asMapView", View.asMap()); - - singletonView = create.apply("forCombinedTypes", Mean.globally().asSingletonView()); - iterableView = create.apply("asIterableView", View.asIterable()); - - container = InProcessSideInputContainer.create( - context, ImmutableList.of(iterableView, mapView, singletonView)); - } - - @Test - public void getAfterWriteReturnsPaneInWindow() throws Exception { - WindowedValue> one = - WindowedValue.of( - KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue> two = - WindowedValue.of( - KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(mapView, ImmutableList.>of(one, two)); - - Map viewContents = - container - .createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, FIRST_WINDOW); - assertThat(viewContents, hasEntry("one", 1)); - assertThat(viewContents, hasEntry("two", 2)); - assertThat(viewContents.size(), is(2)); - } - - @Test - public void getReturnsLatestPaneInWindow() throws Exception { - WindowedValue> one = - WindowedValue.of( - KV.of("one", 1), - new Instant(1L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue> two = - WindowedValue.of( - KV.of("two", 2), - new Instant(20L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - container.write(mapView, ImmutableList.>of(one, two)); - - Map viewContents = - container - .createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, SECOND_WINDOW); - assertThat(viewContents, hasEntry("one", 1)); - assertThat(viewContents, hasEntry("two", 2)); - assertThat(viewContents.size(), is(2)); - - WindowedValue> three = - WindowedValue.of( - KV.of("three", 3), - new Instant(300L), - SECOND_WINDOW, - PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); - container.write(mapView, ImmutableList.>of(three)); - - Map overwrittenViewContents = - container - .createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, SECOND_WINDOW); - assertThat(overwrittenViewContents, hasEntry("three", 3)); - assertThat(overwrittenViewContents.size(), is(1)); - } - - /** - * Demonstrates that calling get() on a window that currently has no data does not return until - * there is data in the pane. - */ - @Test - public void getNotReadyThrows() throws Exception { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("not ready"); - - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, GlobalWindow.INSTANCE); - } - - @Test - public void withPCollectionViewsErrorsForContainsNotInViews() { - PCollectionView>> newView = - PCollectionViews.multimapView( - pipeline, - WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); - - container.createReaderForViews(ImmutableList.>of(newView)); - } - - @Test - public void withViewsForViewNotInContainerFails() { - PCollectionView>> newView = - PCollectionViews.multimapView( - pipeline, - WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("unknown views"); - thrown.expectMessage(newView.toString()); - - container.createReaderForViews(ImmutableList.>of(newView)); - } - - @Test - public void getOnReaderForViewNotInReaderFails() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("unknown view: " + iterableView.toString()); - - container.createReaderForViews(ImmutableList.>of(mapView)) - .get(iterableView, GlobalWindow.INSTANCE); - } - - @Test - public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception { - WindowedValue firstWindowedValue = - WindowedValue.of( - 2.875, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue secondWindowedValue = - WindowedValue.of( - 4.125, - SECOND_WINDOW.maxTimestamp().minus(2_000_000L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); - assertThat( - container - .createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, FIRST_WINDOW), - equalTo(2.875)); - assertThat( - container - .createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, SECOND_WINDOW), - equalTo(4.125)); - } - - @Test - public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception { - WindowedValue firstValue = - WindowedValue.of( - 44, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue secondValue = - WindowedValue.of( - 44, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - - container.write(iterableView, ImmutableList.of(firstValue, secondValue)); - - assertThat( - container - .createReaderForViews(ImmutableList.>of(iterableView)) - .get(iterableView, FIRST_WINDOW), - contains(44, 44)); - } - - @Test - public void writeForElementInMultipleWindowsSucceeds() throws Exception { - WindowedValue multiWindowedValue = - WindowedValue.of( - 2.875, - FIRST_WINDOW.maxTimestamp().minus(200L), - ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(singletonView, ImmutableList.of(multiWindowedValue)); - assertThat( - container - .createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, FIRST_WINDOW), - equalTo(2.875)); - assertThat( - container - .createReaderForViews(ImmutableList.>of(singletonView)) - .get(singletonView, SECOND_WINDOW), - equalTo(2.875)); - } - - @Test - public void finishDoesNotOverwriteWrittenElements() throws Exception { - WindowedValue> one = - WindowedValue.of( - KV.of("one", 1), - new Instant(1L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue> two = - WindowedValue.of( - KV.of("two", 2), - new Instant(20L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - container.write(mapView, ImmutableList.>of(one, two)); - - immediatelyInvokeCallback(mapView, SECOND_WINDOW); - - Map viewContents = - container - .createReaderForViews(ImmutableList.>of(mapView)) - .get(mapView, SECOND_WINDOW); - - assertThat(viewContents, hasEntry("one", 1)); - assertThat(viewContents, hasEntry("two", 2)); - assertThat(viewContents.size(), is(2)); - } - - @Test - public void finishOnPendingViewsSetsEmptyElements() throws Exception { - immediatelyInvokeCallback(mapView, SECOND_WINDOW); - Future> mapFuture = - getFutureOfView( - container.createReaderForViews(ImmutableList.>of(mapView)), - mapView, - SECOND_WINDOW); - - assertThat(mapFuture.get().isEmpty(), is(true)); - } - - /** - * Demonstrates that calling isReady on an empty container throws an - * {@link IllegalArgumentException}. - */ - @Test - public void isReadyInEmptyReaderThrows() { - ReadyCheckingSideInputReader reader = - container.createReaderForViews(ImmutableList.>of()); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("does not contain"); - thrown.expectMessage(ImmutableList.of().toString()); - reader.isReady(mapView, GlobalWindow.INSTANCE); - } - - /** - * Demonstrates that calling isReady returns false until elements are written to the - * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true. - */ - @Test - public void isReadyForSomeNotReadyViewsFalseUntilElements() { - container.write( - mapView, - ImmutableList.of( - WindowedValue.of( - KV.of("one", 1), - SECOND_WINDOW.maxTimestamp().minus(100L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); - - ReadyCheckingSideInputReader reader = - container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); - assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); - - container.write( - mapView, - ImmutableList.of( - WindowedValue.of( - KV.of("too", 2), - FIRST_WINDOW.maxTimestamp().minus(100L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); - // Cached value is false - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); - - container.write( - singletonView, - ImmutableList.of( - WindowedValue.of( - 1.25, - SECOND_WINDOW.maxTimestamp().minus(100L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); - - assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - - reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); - } - - @Test - public void isReadyForEmptyWindowTrue() throws Exception { - CountDownLatch onComplete = new CountDownLatch(1); - immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); - CountDownLatch latch = invokeLatchedCallback(singletonView, GlobalWindow.INSTANCE, onComplete); - - ReadyCheckingSideInputReader reader = - container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - - latch.countDown(); - if (!onComplete.await(1500L, TimeUnit.MILLISECONDS)) { - fail("Callback to set empty values did not complete!"); - } - // The cached value was false, so it continues to be true - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - - // A new reader for the same container gets a fresh look - reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); - } - - /** - * When a callAfterWindowCloses with the specified view's producing transform, window, and - * windowing strategy is invoked, immediately execute the callback. - */ - private void immediatelyInvokeCallback(PCollectionView view, BoundedWindow window) { - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object callback = invocation.getArguments()[3]; - Runnable callbackRunnable = (Runnable) callback; - callbackRunnable.run(); - return null; - } - }) - .when(context) - .scheduleAfterOutputWouldBeProduced( - Mockito.eq(view), - Mockito.eq(window), - Mockito.eq(view.getWindowingStrategyInternal()), - Mockito.any(Runnable.class)); - } - - /** - * When a callAfterWindowCloses with the specified view's producing transform, window, and - * windowing strategy is invoked, start a thread that will invoke the callback after the returned - * {@link CountDownLatch} is counted down once. - */ - private CountDownLatch invokeLatchedCallback( - PCollectionView view, BoundedWindow window, final CountDownLatch onComplete) { - final CountDownLatch runLatch = new CountDownLatch(1); - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object callback = invocation.getArguments()[3]; - final Runnable callbackRunnable = (Runnable) callback; - Executors.newSingleThreadExecutor().submit(new Runnable() { - public void run() { - try { - if (!runLatch.await(1500L, TimeUnit.MILLISECONDS)) { - fail("Run latch didn't count down within timeout"); - } - callbackRunnable.run(); - onComplete.countDown(); - } catch (InterruptedException e) { - fail("Unexpectedly interrupted while waiting for latch to be counted down"); - } - } - }); - return null; - } - }) - .when(context) - .scheduleAfterOutputWouldBeProduced( - Mockito.eq(view), - Mockito.eq(window), - Mockito.eq(view.getWindowingStrategyInternal()), - Mockito.any(Runnable.class)); - return runLatch; - } - - private Future getFutureOfView(final SideInputReader myReader, - final PCollectionView view, final BoundedWindow window) { - Callable callable = new Callable() { - @Override - public ValueT call() throws Exception { - return myReader.get(view, window); - } - }; - return Executors.newSingleThreadExecutor().submit(callable); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java deleted file mode 100644 index 3e01f44..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespaces; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link InProcessTimerInternals}. - */ -@RunWith(JUnit4.class) -public class InProcessTimerInternalsTest { - private MockClock clock; - @Mock private TransformWatermarks watermarks; - - private TimerUpdateBuilder timerUpdateBuilder; - - private InProcessTimerInternals internals; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - clock = MockClock.fromInstant(new Instant(0)); - - timerUpdateBuilder = TimerUpdate.builder(StructuralKey.of(1234, VarIntCoder.of())); - - internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder); - } - - @Test - public void setTimerAddsToBuilder() { - TimerData eventTimer = - TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); - TimerData processingTimer = - TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); - TimerData synchronizedProcessingTimer = - TimerData.of( - StateNamespaces.global(), - new Instant(98745632189L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - internals.setTimer(eventTimer); - internals.setTimer(processingTimer); - internals.setTimer(synchronizedProcessingTimer); - - assertThat( - internals.getTimerUpdate().getSetTimers(), - containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer)); - } - - @Test - public void deleteTimerDeletesOnBuilder() { - TimerData eventTimer = - TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); - TimerData processingTimer = - TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); - TimerData synchronizedProcessingTimer = - TimerData.of( - StateNamespaces.global(), - new Instant(98745632189L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - internals.deleteTimer(eventTimer); - internals.deleteTimer(processingTimer); - internals.deleteTimer(synchronizedProcessingTimer); - - assertThat( - internals.getTimerUpdate().getDeletedTimers(), - containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer)); - } - - @Test - public void getProcessingTimeIsClockNow() { - assertThat(internals.currentProcessingTime(), equalTo(clock.now())); - Instant oldProcessingTime = internals.currentProcessingTime(); - - clock.advance(Duration.standardHours(12)); - - assertThat(internals.currentProcessingTime(), equalTo(clock.now())); - assertThat( - internals.currentProcessingTime(), - equalTo(oldProcessingTime.plus(Duration.standardHours(12)))); - } - - @Test - public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() { - when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new Instant(12345L)); - assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new Instant(12345L))); - } - - @Test - public void getInputWatermarkTimeUsesWatermarkTime() { - when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L)); - assertThat(internals.currentInputWatermarkTime(), equalTo(new Instant(8765L))); - } - - @Test - public void getOutputWatermarkTimeUsesWatermarkTime() { - when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L)); - assertThat(internals.currentOutputWatermarkTime(), equalTo(new Instant(25525L))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java new file mode 100644 index 0000000..3c9c9ee --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.IdentitySideInputWindowFn; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.annotation.Nullable; + +/** + * Tests for {@link ParDoEvaluator}. + */ +@RunWith(JUnit4.class) +public class ParDoEvaluatorTest { + @Mock private EvaluationContext evaluationContext; + private PCollection inputPc; + private TupleTag mainOutputTag; + private List> sideOutputTags; + private BundleFactory bundleFactory; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + TestPipeline p = TestPipeline.create(); + inputPc = p.apply(Create.of(1, 2, 3)); + mainOutputTag = new TupleTag() {}; + sideOutputTags = TupleTagList.empty().getAll(); + + bundleFactory = ImmutableListBundleFactory.create(); + } + + @Test + public void sideInputsNotReadyResultHasUnprocessedElements() { + PCollectionView singletonView = + inputPc + .apply(Window.into(new IdentitySideInputWindowFn())) + .apply(View.asSingleton().withDefaultValue(0)); + RecorderFn fn = new RecorderFn(singletonView); + PCollection output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView)); + + CommittedBundle inputBundle = + bundleFactory.createRootBundle(inputPc).commit(Instant.now()); + UncommittedBundle outputBundle = bundleFactory.createBundle(inputBundle, output); + when(evaluationContext.createBundle(inputBundle, output)) + .thenReturn(outputBundle); + + ParDoEvaluator evaluator = + createEvaluator(singletonView, fn, inputBundle, output); + + IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); + WindowedValue first = WindowedValue.valueInGlobalWindow(3); + WindowedValue second = + WindowedValue.of(2, new Instant(1234L), nonGlobalWindow, PaneInfo.NO_FIRING); + WindowedValue third = + WindowedValue.of( + 1, + new Instant(2468L), + ImmutableList.of(nonGlobalWindow, GlobalWindow.INSTANCE), + PaneInfo.NO_FIRING); + + evaluator.processElement(first); + evaluator.processElement(second); + evaluator.processElement(third); + TransformResult result = evaluator.finishBundle(); + + assertThat( + result.getUnprocessedElements(), + Matchers.>containsInAnyOrder( + second, WindowedValue.of(1, new Instant(2468L), nonGlobalWindow, PaneInfo.NO_FIRING))); + assertThat(result.getOutputBundles(), Matchers.>contains(outputBundle)); + assertThat(fn.processed, containsInAnyOrder(1, 3)); + assertThat( + Iterables.getOnlyElement(result.getOutputBundles()).commit(Instant.now()).getElements(), + Matchers.>containsInAnyOrder( + first.withValue(8), + WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L)))); + } + + private ParDoEvaluator createEvaluator( + PCollectionView singletonView, + RecorderFn fn, + DirectRunner.CommittedBundle inputBundle, + PCollection output) { + when( + evaluationContext.createSideInputReader( + ImmutableList.>of(singletonView))) + .thenReturn(new ReadyInGlobalWindowReader()); + DirectExecutionContext executionContext = mock(DirectExecutionContext.class); + DirectStepContext stepContext = mock(DirectStepContext.class); + when( + executionContext.getOrCreateStepContext( + Mockito.any(String.class), Mockito.any(String.class))) + .thenReturn(stepContext); + when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); + when( + evaluationContext.getExecutionContext( + Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class))) + .thenReturn(executionContext); + when(evaluationContext.createCounterSet()).thenReturn(new CounterSet()); + + return ParDoEvaluator.create( + evaluationContext, + inputBundle, + (AppliedPTransform, ?, ?>) output.getProducingTransformInternal(), + fn, + ImmutableList.>of(singletonView), + mainOutputTag, + sideOutputTags, + ImmutableMap., PCollection>of(mainOutputTag, output)); + } + + private static class RecorderFn extends DoFn { + private Collection processed; + private final PCollectionView view; + + public RecorderFn(PCollectionView view) { + processed = new ArrayList<>(); + this.view = view; + } + + @Override + public void processElement(DoFn.ProcessContext c) throws Exception { + processed.add(c.element()); + c.output(c.element() + c.sideInput(view)); + } + } + + private static class ReadyInGlobalWindowReader implements ReadyCheckingSideInputReader { + @Override + @Nullable + public T get(PCollectionView view, BoundedWindow window) { + if (window.equals(GlobalWindow.INSTANCE)) { + return (T) (Integer) 5; + } + fail("Should only call get in the Global Window, others are not ready"); + throw new AssertionError("Unreachable"); + } + + @Override + public boolean contains(PCollectionView view) { + return true; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean isReady(PCollectionView view, BoundedWindow window) { + return window.equals(GlobalWindow.INSTANCE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java deleted file mode 100644 index b78eb40..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IdentitySideInputWindowFn; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import javax.annotation.Nullable; - -/** - * Tests for {@link ParDoInProcessEvaluator}. - */ -@RunWith(JUnit4.class) -public class ParDoInProcessEvaluatorTest { - @Mock private InProcessEvaluationContext evaluationContext; - private PCollection inputPc; - private TupleTag mainOutputTag; - private List> sideOutputTags; - private BundleFactory bundleFactory; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - TestPipeline p = TestPipeline.create(); - inputPc = p.apply(Create.of(1, 2, 3)); - mainOutputTag = new TupleTag() {}; - sideOutputTags = TupleTagList.empty().getAll(); - - bundleFactory = InProcessBundleFactory.create(); - } - - @Test - public void sideInputsNotReadyResultHasUnprocessedElements() { - PCollectionView singletonView = - inputPc - .apply(Window.into(new IdentitySideInputWindowFn())) - .apply(View.asSingleton().withDefaultValue(0)); - RecorderFn fn = new RecorderFn(singletonView); - PCollection output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView)); - - CommittedBundle inputBundle = - bundleFactory.createRootBundle(inputPc).commit(Instant.now()); - UncommittedBundle outputBundle = bundleFactory.createBundle(inputBundle, output); - when(evaluationContext.createBundle(inputBundle, output)) - .thenReturn(outputBundle); - - ParDoInProcessEvaluator evaluator = - createEvaluator(singletonView, fn, inputBundle, output); - - IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); - WindowedValue first = WindowedValue.valueInGlobalWindow(3); - WindowedValue second = - WindowedValue.of(2, new Instant(1234L), nonGlobalWindow, PaneInfo.NO_FIRING); - WindowedValue third = - WindowedValue.of( - 1, - new Instant(2468L), - ImmutableList.of(nonGlobalWindow, GlobalWindow.INSTANCE), - PaneInfo.NO_FIRING); - - evaluator.processElement(first); - evaluator.processElement(second); - evaluator.processElement(third); - InProcessTransformResult result = evaluator.finishBundle(); - - assertThat( - result.getUnprocessedElements(), - Matchers.>containsInAnyOrder( - second, WindowedValue.of(1, new Instant(2468L), nonGlobalWindow, PaneInfo.NO_FIRING))); - assertThat(result.getOutputBundles(), Matchers.>contains(outputBundle)); - assertThat(fn.processed, containsInAnyOrder(1, 3)); - assertThat( - Iterables.getOnlyElement(result.getOutputBundles()).commit(Instant.now()).getElements(), - Matchers.>containsInAnyOrder( - first.withValue(8), - WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L)))); - } - - private ParDoInProcessEvaluator createEvaluator( - PCollectionView singletonView, - RecorderFn fn, - DirectRunner.CommittedBundle inputBundle, - PCollection output) { - when( - evaluationContext.createSideInputReader( - ImmutableList.>of(singletonView))) - .thenReturn(new ReadyInGlobalWindowReader()); - InProcessExecutionContext executionContext = mock(InProcessExecutionContext.class); - InProcessStepContext stepContext = mock(InProcessStepContext.class); - when( - executionContext.getOrCreateStepContext( - Mockito.any(String.class), Mockito.any(String.class))) - .thenReturn(stepContext); - when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); - when( - evaluationContext.getExecutionContext( - Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class))) - .thenReturn(executionContext); - when(evaluationContext.createCounterSet()).thenReturn(new CounterSet()); - - return ParDoInProcessEvaluator.create( - evaluationContext, - inputBundle, - (AppliedPTransform, ?, ?>) output.getProducingTransformInternal(), - fn, - ImmutableList.>of(singletonView), - mainOutputTag, - sideOutputTags, - ImmutableMap., PCollection>of(mainOutputTag, output)); - } - - private static class RecorderFn extends DoFn { - private Collection processed; - private final PCollectionView view; - - public RecorderFn(PCollectionView view) { - processed = new ArrayList<>(); - this.view = view; - } - - @Override - public void processElement(DoFn.ProcessContext c) throws Exception { - processed.add(c.element()); - c.output(c.element() + c.sideInput(view)); - } - } - - private static class ReadyInGlobalWindowReader implements ReadyCheckingSideInputReader { - @Override - @Nullable - public T get(PCollectionView view, BoundedWindow window) { - if (window.equals(GlobalWindow.INSTANCE)) { - return (T) (Integer) 5; - } - fail("Should only call get in the Global Window, others are not ready"); - throw new AssertionError("Unreachable"); - } - - @Override - public boolean contains(PCollectionView view) { - return true; - } - - @Override - public boolean isEmpty() { - return false; - } - - @Override - public boolean isReady(PCollectionView view, BoundedWindow window) { - return window.equals(GlobalWindow.INSTANCE); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java index e61881e..6206c22 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java @@ -25,9 +25,9 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -69,7 +69,7 @@ import java.io.Serializable; */ @RunWith(JUnit4.class) public class ParDoMultiEvaluatorFactoryTest implements Serializable { - private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + private transient BundleFactory bundleFactory = ImmutableListBundleFactory.create(); @Test public void testParDoMultiInMemoryTransformEvaluator() throws Exception { @@ -101,7 +101,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { PCollection elementOutput = outputTuple.get(elementTag); PCollection lengthOutput = outputTuple.get(lengthTag); - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + EvaluationContext evaluationContext = mock(EvaluationContext.class); UncommittedBundle> mainOutputBundle = bundleFactory.createRootBundle(mainOutput); UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); @@ -112,8 +112,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { .thenReturn(elementOutputBundle); when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, null, null, null); + DirectExecutionContext executionContext = + new DirectExecutionContext(null, null, null, null); when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); @@ -130,7 +130,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { evaluator.processElement( WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - InProcessTransformResult result = evaluator.finishBundle(); + TransformResult result = evaluator.finishBundle(); assertThat( result.getOutputBundles(), Matchers.>containsInAnyOrder( @@ -188,7 +188,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + EvaluationContext evaluationContext = mock(EvaluationContext.class); UncommittedBundle> mainOutputBundle = bundleFactory.createRootBundle(mainOutput); UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); @@ -197,8 +197,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, elementOutput)) .thenReturn(elementOutputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, null, null, null); + DirectExecutionContext executionContext = + new DirectExecutionContext(null, null, null, null); when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), inputBundle.getKey())).thenReturn(executionContext); CounterSet counters = new CounterSet(); @@ -215,7 +215,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { evaluator.processElement( WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - InProcessTransformResult result = evaluator.finishBundle(); + TransformResult result = evaluator.finishBundle(); assertThat( result.getOutputBundles(), Matchers.>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); @@ -278,7 +278,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + EvaluationContext evaluationContext = mock(EvaluationContext.class); UncommittedBundle> mainOutputBundle = bundleFactory.createRootBundle(mainOutput); UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); @@ -287,7 +287,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, elementOutput)) .thenReturn(elementOutputBundle); - InProcessExecutionContext executionContext = new InProcessExecutionContext(null, + DirectExecutionContext executionContext = new DirectExecutionContext(null, StructuralKey.of("myKey", StringUtf8Coder.of()), null, null); @@ -307,7 +307,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { evaluator.processElement( WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - InProcessTransformResult result = evaluator.finishBundle(); + TransformResult result = evaluator.finishBundle(); assertThat( result.getOutputBundles(), Matchers.>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); @@ -390,7 +390,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { PCollection> mainOutput = outputTuple.get(mainOutputTag); PCollection elementOutput = outputTuple.get(elementTag); - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + EvaluationContext evaluationContext = mock(EvaluationContext.class); UncommittedBundle> mainOutputBundle = bundleFactory.createRootBundle(mainOutput); UncommittedBundle elementOutputBundle = bundleFactory.createRootBundle(elementOutput); @@ -399,7 +399,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createBundle(inputBundle, elementOutput)) .thenReturn(elementOutputBundle); - InProcessExecutionContext executionContext = new InProcessExecutionContext(null, + DirectExecutionContext executionContext = new DirectExecutionContext(null, StructuralKey.of("myKey", StringUtf8Coder.of()), null, null); when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), @@ -418,7 +418,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { evaluator.processElement( WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - InProcessTransformResult result = evaluator.finishBundle(); + TransformResult result = evaluator.finishBundle(); assertThat( result.getTimerUpdate(), equalTo(