Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E234E200BA7 for ; Fri, 7 Oct 2016 00:31:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E0A20160AE0; Thu, 6 Oct 2016 22:31:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38002160ADB for ; Fri, 7 Oct 2016 00:31:30 +0200 (CEST) Received: (qmail 12923 invoked by uid 500); 6 Oct 2016 22:31:29 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 12910 invoked by uid 99); 6 Oct 2016 22:31:29 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2016 22:31:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EAB50180538 for ; Thu, 6 Oct 2016 22:31:28 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id fT-4B-tCOkeJ for ; Thu, 6 Oct 2016 22:31:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C2DC25FAFA for ; Thu, 6 Oct 2016 22:31:18 +0000 (UTC) Received: (qmail 12575 invoked by uid 99); 6 Oct 2016 22:31:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2016 22:31:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A08CCE08AF; Thu, 6 Oct 2016 22:31:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Thu, 06 Oct 2016 22:31:19 -0000 Message-Id: <8385f9662d7a45c482d18ca9ea7409f5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/4] incubator-beam git commit: Remove KeyedResourcePool archived-at: Thu, 06 Oct 2016 22:31:32 -0000 Remove KeyedResourcePool This interface is no longer used. Instead, the runner ensures that bundles will be provided containing the appropriate input to the TestStreamEvaluatorFactory. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0 Branch: refs/heads/master Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774 Parents: 7306e16 Author: Thomas Groh Authored: Wed Oct 5 13:12:48 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:14:38 2016 -0700 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactory.java | 40 +++-- .../beam/runners/direct/DirectRunner.java | 2 + .../beam/runners/direct/EmptyInputProvider.java | 49 ++++++ .../direct/ExecutorServiceParallelExecutor.java | 27 ++- .../runners/direct/FlattenEvaluatorFactory.java | 18 +- .../beam/runners/direct/KeyedResourcePool.java | 47 ------ .../runners/direct/LockedKeyedResourcePool.java | 95 ----------- .../beam/runners/direct/RootInputProvider.java | 41 +++++ .../runners/direct/RootProviderRegistry.java | 65 ++++++++ .../direct/RootTransformEvaluatorFactory.java | 42 ----- .../direct/TestStreamEvaluatorFactory.java | 39 +++-- .../direct/TransformEvaluatorRegistry.java | 17 +- .../direct/UnboundedReadEvaluatorFactory.java | 56 ++++--- .../direct/BoundedReadEvaluatorFactoryTest.java | 3 +- .../direct/FlattenEvaluatorFactoryTest.java | 3 +- .../direct/LockedKeyedResourcePoolTest.java | 163 ------------------- .../direct/TestStreamEvaluatorFactoryTest.java | 3 +- .../UnboundedReadEvaluatorFactoryTest.java | 8 +- 18 files changed, 269 insertions(+), 449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 4936ad9..326a535 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection; * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. */ -final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory { +final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { private final EvaluationContext evaluationContext; BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; } - @Override - public Collection> getInitialInputs(AppliedPTransform transform) { - return createInitialSplits((AppliedPTransform) transform); - } - - private Collection> createInitialSplits( - AppliedPTransform> transform) { - BoundedSource source = transform.getTransform().getSource(); - return Collections.>singleton( - evaluationContext - .>createRootBundle() - .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source))) - .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override @Nullable @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory abstract BoundedSource getSource(); } + + static class InputProvider implements RootInputProvider { + private final EvaluationContext evaluationContext; + + InputProvider(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + + @Override + public Collection> getInitialInputs(AppliedPTransform transform) { + return createInitialSplits((AppliedPTransform) transform); + } + + private Collection> createInitialSplits( + AppliedPTransform> transform) { + BoundedSource source = transform.getTransform().getSource(); + return Collections.>singleton( + evaluationContext + .>createRootBundle() + .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source))) + .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 2ec4f08..67ec3e6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -248,12 +248,14 @@ public class DirectRunner // independent executor service for each run ExecutorService executorService = executorServiceSupplier.get(); + RootInputProvider rootInputProvider = RootProviderRegistry.defaultRegistry(context); TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context); PipelineExecutor executor = ExecutorServiceParallelExecutor.create( executorService, consumerTrackingVisitor.getValueToConsumers(), keyedPValueVisitor.getKeyedPValues(), + rootInputProvider, registry, defaultModelEnforcements(options), context); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java new file mode 100644 index 0000000..10d63e9 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import java.util.Collection; +import java.util.Collections; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +/** + * A {@link RootInputProvider} that provides a singleton empty bundle. + */ +class EmptyInputProvider implements RootInputProvider { + private final EvaluationContext evaluationContext; + + EmptyInputProvider(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + + /** + * {@inheritDoc}. + * + *

Returns a single empty bundle. This bundle ensures that any {@link PTransform PTransforms} + * that consume from the output of the provided {@link AppliedPTransform} have watermarks updated + * as appropriate. + */ + @Override + public Collection> getInitialInputs(AppliedPTransform transform) { + return Collections.>singleton( + evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index bb89699..52c45c3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; + import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; @@ -67,6 +69,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { private final Map>> valueToConsumers; private final Set keyedPValues; + private final RootInputProvider rootInputProvider; private final TransformEvaluatorRegistry registry; @SuppressWarnings("rawtypes") private final Map, Collection> @@ -101,18 +104,27 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { ExecutorService executorService, Map>> valueToConsumers, Set keyedPValues, + RootInputProvider rootInputProvider, TransformEvaluatorRegistry registry, @SuppressWarnings("rawtypes") - Map, Collection> transformEnforcements, + Map, Collection> + transformEnforcements, EvaluationContext context) { return new ExecutorServiceParallelExecutor( - executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context); + executorService, + valueToConsumers, + keyedPValues, + rootInputProvider, + registry, + transformEnforcements, + context); } private ExecutorServiceParallelExecutor( ExecutorService executorService, Map>> valueToConsumers, Set keyedPValues, + RootInputProvider rootInputProvider, TransformEvaluatorRegistry registry, @SuppressWarnings("rawtypes") Map, Collection> transformEnforcements, @@ -120,6 +132,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { this.executorService = executorService; this.valueToConsumers = valueToConsumers; this.keyedPValues = keyedPValues; + this.rootInputProvider = rootInputProvider; this.registry = registry; this.transformEnforcements = transformEnforcements; this.evaluationContext = context; @@ -153,7 +166,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { public void start(Collection> roots) { for (AppliedPTransform root : roots) { ConcurrentLinkedQueue> pending = new ConcurrentLinkedQueue<>(); - pending.addAll(registry.getInitialInputs(root)); + Collection> initialInputs = rootInputProvider.getInitialInputs(root); + checkState( + !initialInputs.isEmpty(), + "All root transforms must have initial inputs. Got 0 for %s", + root.getFullName()); + pending.addAll(initialInputs); pendingRootBundles.put(root, pending); } evaluationContext.initialize(pendingRootBundles); @@ -385,7 +403,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { LOG.debug("Executor Update: {}", update); if (update.getBundle().isPresent()) { if (ExecutorState.ACTIVE == startingState - || (ExecutorState.PROCESSING == startingState && noWorkOutstanding)) { + || (ExecutorState.PROCESSING == startingState + && noWorkOutstanding)) { scheduleConsumers(update); } else { allUpdates.offer(update); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java index 90db040..57d5628 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java @@ -17,15 +17,12 @@ */ package org.apache.beam.runners.direct; -import java.util.Collection; -import java.util.Collections; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -34,26 +31,13 @@ import org.apache.beam.sdk.values.PCollectionList; * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten} * {@link PTransform}. */ -class FlattenEvaluatorFactory implements RootTransformEvaluatorFactory { +class FlattenEvaluatorFactory implements TransformEvaluatorFactory { private final EvaluationContext evaluationContext; FlattenEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; } - /** - * {@inheritDoc}. - * - *

Returns a single empty bundle. {@link Flatten} on no inputs produces no outputs. This bundle - * ensures that any {@link PTransform PTransforms} that consume from the output of the provided - * {@link AppliedPTransform} have watermarks updated as appropriate. - */ - @Override - public Collection> getInitialInputs(AppliedPTransform transform) { - return Collections.>singleton( - evaluationContext.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - @Override public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java deleted file mode 100644 index b976b69..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedResourcePool.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import com.google.common.base.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; - -/** - * A pool of resources associated with specific keys. Implementations enforce specific use patterns, - * such as limiting the the number of outstanding elements available per key. - */ -interface KeyedResourcePool { - /** - * Tries to acquire a value for the provided key, loading it via the provided loader if necessary. - * - *

If the returned {@link Optional} contains a value, the caller obtains ownership of that - * value. The value should be released back to this {@link KeyedResourcePool} after the - * caller no longer has use of it using {@link #release(Object, Object)}. - * - *

The provided {@link Callable} must not return null; it may either return a non-null - * value or throw an exception. - */ - Optional tryAcquire(K key, Callable loader) throws ExecutionException; - - /** - * Release the provided value, relinquishing ownership of it. Future calls to - * {@link #tryAcquire(Object, Callable)} may return the released value. - */ - void release(K key, V value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java deleted file mode 100644 index 8b1e0b1..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/LockedKeyedResourcePool.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ExecutionError; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; - -/** - * A {@link KeyedResourcePool} which is limited to at most one outstanding instance at a time for - * each key. - */ -class LockedKeyedResourcePool implements KeyedResourcePool { - /** - * A map from each key to an {@link Optional} of the associated value. At most one value is stored - * per key, and it is obtained by at most one thread at a time. - * - *

For each key in this map: - * - *

    - *
  • If there is no associated value, then no value has been stored yet. - *
  • If the value is {@code Optional.absent()} then the value is currently in use. - *
  • If the value is {@code Optional.present()} then the contained value is available for use. - *
- */ - public static LockedKeyedResourcePool create() { - return new LockedKeyedResourcePool<>(); - } - - private final ConcurrentMap> cache; - - private LockedKeyedResourcePool() { - cache = new ConcurrentHashMap<>(); - } - - @Override - public Optional tryAcquire(K key, Callable loader) throws ExecutionException { - Optional value = cache.replace(key, Optional.absent()); - if (value == null) { - // No value already existed, so populate the cache with the value returned by the loader - cache.putIfAbsent(key, Optional.of(load(loader))); - // Some other thread may obtain the result after the putIfAbsent, so retry acquisition - value = cache.replace(key, Optional.absent()); - } - return value; - } - - private V load(Callable loader) throws ExecutionException { - try { - return loader.call(); - } catch (Error t) { - throw new ExecutionError(t); - } catch (RuntimeException e) { - throw new UncheckedExecutionException(e); - } catch (Exception e) { - throw new ExecutionException(e); - } - } - - @Override - public void release(K key, V value) { - Optional replaced = cache.replace(key, Optional.of(value)); - checkNotNull(replaced, "Tried to release before a value was acquired"); - checkState( - !replaced.isPresent(), - "Released a value to a %s where there is already a value present for key %s (%s). " - + "At most one value may be present at a time.", - LockedKeyedResourcePool.class.getSimpleName(), - key, - replaced); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java new file mode 100644 index 0000000..40c7301 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootInputProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import java.util.Collection; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Provides {@link CommittedBundle bundles} that will be provided to the + * {@link PTransform PTransforms} that are at the root of a {@link Pipeline}. + */ +interface RootInputProvider { + /** + * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be + * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs. + * + *

For source transforms, these should be sufficient that, when provided to the evaluators + * produced by {@link TransformEvaluatorFactory#forApplication(AppliedPTransform, + * CommittedBundle)}, all of the elements contained in the source are eventually produced. + */ + Collection> getInitialInputs(AppliedPTransform transform); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java new file mode 100644 index 0000000..f6335fd --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableMap; +import java.util.Collection; +import java.util.Map; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * A {@link RootInputProvider} that delegates to primitive {@link RootInputProvider} implementations + * based on the type of {@link PTransform} of the application. + */ +class RootProviderRegistry implements RootInputProvider { + public static RootProviderRegistry defaultRegistry(EvaluationContext context) { + ImmutableMap.Builder, RootInputProvider> defaultProviders = + ImmutableMap.builder(); + defaultProviders + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory.InputProvider(context)) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory.InputProvider(context)) + .put(TestStream.class, new TestStreamEvaluatorFactory.InputProvider(context)) + .put(FlattenPCollectionList.class, new EmptyInputProvider(context)); + return new RootProviderRegistry(defaultProviders.build()); + } + + private final Map, RootInputProvider> providers; + + private RootProviderRegistry(Map, RootInputProvider> providers) { + this.providers = providers; + } + + @Override + public Collection> getInitialInputs(AppliedPTransform transform) { + Class transformClass = transform.getTransform().getClass(); + RootInputProvider provider = + checkNotNull( + providers.get(transformClass), + "Tried to get a %s for a Transform of type %s, but there is no such provider", + RootInputProvider.class.getSimpleName(), + transformClass.getSimpleName()); + return provider.getInitialInputs(transform); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java deleted file mode 100644 index 5785dea..0000000 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootTransformEvaluatorFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import java.util.Collection; -import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - -/** - * A {@link TransformEvaluatorFactory} for {@link PTransform PTransforms} that are at the root of a - * {@link Pipeline}. Provides a way to get initial inputs, which will cause the {@link PTransform} - * to produce all of the appropriate output. - */ -interface RootTransformEvaluatorFactory extends TransformEvaluatorFactory { - /** - * Get the initial inputs for the {@link AppliedPTransform}. The {@link AppliedPTransform} will be - * provided with these {@link CommittedBundle bundles} as input when the {@link Pipeline} runs. - * - *

For source transforms, these should be sufficient that, when provided to the evaluators - * produced by {@link #forApplication(AppliedPTransform, CommittedBundle)}, all of the elements - * contained in the source are eventually produced. - */ - Collection> getInitialInputs(AppliedPTransform transform); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 8e634c8..ffb4fb5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -52,28 +52,13 @@ import org.joda.time.Duration; import org.joda.time.Instant; /** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */ -class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory { +class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { private final EvaluationContext evaluationContext; TestStreamEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; } - @Override - public Collection> getInitialInputs(AppliedPTransform transform) { - return createInputBundle((AppliedPTransform) transform); - } - - private Collection> createInputBundle( - AppliedPTransform> transform) { - CommittedBundle> initialBundle = - evaluationContext - .>createRootBundle() - .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform()))) - .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); - return Collections.>singleton(initialBundle); - } - @Nullable @Override public TransformEvaluator forApplication( @@ -206,6 +191,28 @@ class TestStreamEvaluatorFactory implements RootTransformEvaluatorFactory { } } + static class InputProvider implements RootInputProvider { + private final EvaluationContext evaluationContext; + + InputProvider(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + + @Override + public Collection> getInitialInputs(AppliedPTransform transform) { + return createInputBundle((AppliedPTransform) transform); + } + + private Collection> createInputBundle( + AppliedPTransform> transform) { + CommittedBundle> initialBundle = + evaluationContext + .>createRootBundle() + .add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(transform.getTransform()))) + .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + return Collections.>singleton(initialBundle); + } + } @AutoValue abstract static class TestStreamIndex { static TestStreamIndex of(TestStream stream) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 3332c2a..4b495e6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.direct; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.ImmutableMap; @@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory; * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} * implementations based on the type of {@link PTransform} of the application. */ -class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory { +class TransformEvaluatorRegistry implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class); public static TransformEvaluatorRegistry defaultRegistry(EvaluationContext ctxt) { @SuppressWarnings("rawtypes") @@ -77,20 +76,6 @@ class TransformEvaluatorRegistry implements RootTransformEvaluatorFactory { } @Override - public Collection> getInitialInputs(AppliedPTransform transform) { - checkState( - !finished.get(), "Tried to get initial inputs for a finished TransformEvaluatorRegistry"); - TransformEvaluatorFactory factory = getFactory(transform); - checkArgument( - factory instanceof RootTransformEvaluatorFactory, - "Tried to get Initial Inputs for Transform %s. %s does not have an associated %s", - transform.getFullName(), - transform.getTransform().getClass().getSimpleName(), - RootTransformEvaluatorFactory.class.getSimpleName()); - return ((RootTransformEvaluatorFactory) factory).getInitialInputs(transform); - } - - @Override public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 1a89695..08dc286 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -46,12 +44,11 @@ import org.joda.time.Instant; * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}. */ -class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory { +class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume @VisibleForTesting static final double DEFAULT_READER_REUSE_CHANCE = 0.95; private final EvaluationContext evaluationContext; - private final ConcurrentMap, UnboundedReadDeduplicator> deduplicators; private final double readerReuseChance; UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) { @@ -61,31 +58,9 @@ class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory { @VisibleForTesting UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, double readerReuseChance) { this.evaluationContext = evaluationContext; - deduplicators = new ConcurrentHashMap<>(); this.readerReuseChance = readerReuseChance; } - @Override - public Collection> getInitialInputs(AppliedPTransform transform) { - return createInitialSplits((AppliedPTransform) transform); - } - - private Collection> createInitialSplits( - AppliedPTransform> transform) { - UnboundedSource source = transform.getTransform().getSource(); - UnboundedReadDeduplicator deduplicator = - source.requiresDeduping() - ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() - : NeverDeduplicator.create(); - - UnboundedSourceShard shard = UnboundedSourceShard.unstarted(source, deduplicator); - return Collections.>singleton( - evaluationContext - .>createRootBundle() - .add(WindowedValue.>valueInGlobalWindow(shard)) - .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override @Nullable @@ -269,4 +244,33 @@ class UnboundedReadEvaluatorFactory implements RootTransformEvaluatorFactory { return of(getSource(), getDeduplicator(), getExistingReader(), newCheckpoint); } } + + static class InputProvider implements RootInputProvider { + private final EvaluationContext evaluationContext; + + InputProvider(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; + } + + @Override + public Collection> getInitialInputs(AppliedPTransform transform) { + return createInitialSplits((AppliedPTransform) transform); + } + + private Collection> createInitialSplits( + AppliedPTransform> transform) { + UnboundedSource source = transform.getTransform().getSource(); + UnboundedReadDeduplicator deduplicator = + source.requiresDeduping() + ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() + : NeverDeduplicator.create(); + + UnboundedSourceShard shard = UnboundedSourceShard.unstarted(source, deduplicator); + return Collections.>singleton( + evaluationContext + .>createRootBundle() + .add(WindowedValue.>valueInGlobalWindow(shard)) + .commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 8544128..ee17eae 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -86,7 +86,8 @@ public class BoundedReadEvaluatorFactoryTest { when(context.createBundle(longs)).thenReturn(outputBundle); Collection> initialInputs = - factory.getInitialInputs(longs.getProducingTransformInternal()); + new BoundedReadEvaluatorFactory.InputProvider(context) + .getInitialInputs(longs.getProducingTransformInternal()); List> outputs = new ArrayList<>(); for (CommittedBundle shardBundle : initialInputs) { TransformEvaluator evaluator = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 86d98e9..aa7b178 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -128,7 +128,8 @@ public class FlattenEvaluatorFactoryTest { FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext); Collection> initialInputs = - factory.getInitialInputs(flattened.getProducingTransformInternal()); + new EmptyInputProvider(evaluationContext) + .getInitialInputs(flattened.getProducingTransformInternal()); TransformEvaluator emptyEvaluator = factory.forApplication( flattened.getProducingTransformInternal(), Iterables.getOnlyElement(initialInputs)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java deleted file mode 100644 index e1e24a3..0000000 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/LockedKeyedResourcePoolTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.direct; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ExecutionError; -import com.google.common.util.concurrent.UncheckedExecutionException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link LockedKeyedResourcePool}. - */ -@RunWith(JUnit4.class) -public class LockedKeyedResourcePoolTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - private LockedKeyedResourcePool cache = - LockedKeyedResourcePool.create(); - - @Test - public void acquireReleaseAcquireLastLoadedOrReleased() throws ExecutionException { - Optional returned = cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - assertThat(returned.get(), equalTo(3)); - - cache.release("foo", 4); - Optional reacquired = cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - return 5; - } - }); - assertThat(reacquired.get(), equalTo(4)); - } - - @Test - public void acquireReleaseReleaseThrows() throws ExecutionException { - Optional returned = cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - assertThat(returned.get(), equalTo(3)); - - cache.release("foo", 4); - thrown.expect(IllegalStateException.class); - thrown.expectMessage("already a value present"); - thrown.expectMessage("At most one"); - cache.release("foo", 4); - } - - @Test - public void releaseBeforeAcquireThrows() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("before a value was acquired"); - cache.release("bar", 3); - } - - @Test - public void multipleAcquireWithoutReleaseAbsent() throws ExecutionException { - Optional returned = cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - Optional secondReturned = cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - assertThat(secondReturned.isPresent(), is(false)); - } - - @Test - public void acquireMultipleKeysSucceeds() throws ExecutionException { - Optional returned = cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - return 3; - } - }); - Optional secondReturned = cache.tryAcquire("bar", new Callable() { - @Override - public Integer call() throws Exception { - return 4; - } - }); - - assertThat(returned.get(), equalTo(3)); - assertThat(secondReturned.get(), equalTo(4)); - } - - @Test - public void acquireThrowsExceptionWrapped() throws ExecutionException { - final Exception cause = new Exception("checkedException"); - thrown.expect(ExecutionException.class); - thrown.expectCause(equalTo(cause)); - cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - throw cause; - } - }); - } - - @Test - public void acquireThrowsRuntimeExceptionWrapped() throws ExecutionException { - final RuntimeException cause = new RuntimeException("UncheckedException"); - thrown.expect(UncheckedExecutionException.class); - thrown.expectCause(equalTo(cause)); - cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - throw cause; - } - }); - } - - @Test - public void acquireThrowsErrorWrapped() throws ExecutionException { - final Error cause = new Error("Error"); - thrown.expect(ExecutionError.class); - thrown.expectCause(equalTo(cause)); - cache.tryAcquire("foo", new Callable() { - @Override - public Integer call() throws Exception { - throw cause; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 1790b2d..60b9c79 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -81,7 +81,8 @@ public class TestStreamEvaluatorFactoryTest { .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals)); Collection> initialInputs = - factory.getInitialInputs(streamVals.getProducingTransformInternal()); + new TestStreamEvaluatorFactory.InputProvider(context) + .getInitialInputs(streamVals.getProducingTransformInternal()); @SuppressWarnings("unchecked") CommittedBundle> initialBundle = (CommittedBundle>) Iterables.getOnlyElement(initialInputs); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 25642dd..b78fbeb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -97,7 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest { when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); Collection> initialInputs = - factory.getInitialInputs(longs.getProducingTransformInternal()); + new UnboundedReadEvaluatorFactory.InputProvider(context) + .getInitialInputs(longs.getProducingTransformInternal()); CommittedBundle inputShards = Iterables.getOnlyElement(initialInputs); UnboundedSourceShard inputShard = @@ -141,7 +142,8 @@ public class UnboundedReadEvaluatorFactoryTest { AppliedPTransform sourceTransform = pcollection.getProducingTransformInternal(); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); - Collection> initialInputs = factory.getInitialInputs(sourceTransform); + Collection> initialInputs = + new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform); UncommittedBundle output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); @@ -196,7 +198,6 @@ public class UnboundedReadEvaluatorFactoryTest { .commit(Instant.now()); UnboundedReadEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */); - factory.getInitialInputs(pcollection.getProducingTransformInternal()); TransformEvaluator> evaluator = factory.forApplication(sourceTransform, inputBundle); evaluator.processElement(shard); @@ -239,7 +240,6 @@ public class UnboundedReadEvaluatorFactoryTest { .commit(Instant.now()); UnboundedReadEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */); - factory.getInitialInputs(pcollection.getProducingTransformInternal()); TransformEvaluator> evaluator = factory.forApplication(sourceTransform, inputBundle); evaluator.processElement(shard);