Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 203B31931F for ; Fri, 29 Apr 2016 21:56:17 +0000 (UTC) Received: (qmail 24940 invoked by uid 500); 29 Apr 2016 21:56:17 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 24888 invoked by uid 500); 29 Apr 2016 21:56:17 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 24870 invoked by uid 99); 29 Apr 2016 21:56:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 21:56:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E011D1A4767 for ; Fri, 29 Apr 2016 21:56:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Ed8mrhTI6XFp for ; Fri, 29 Apr 2016 21:56:06 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 5657560D62 for ; Fri, 29 Apr 2016 21:56:01 +0000 (UTC) Received: (qmail 23329 invoked by uid 99); 29 Apr 2016 21:55:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 21:55:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D0D5BE0019; Fri, 29 Apr 2016 21:55:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Fri, 29 Apr 2016 21:56:11 -0000 Message-Id: <3da18d0645714949adbedea81dc0d2be@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/17] incubator-beam git commit: Move InProcessRunner to its own module http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java new file mode 100644 index 0000000..1c51738 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; +import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class ParDoInProcessEvaluator implements TransformEvaluator { + public static ParDoInProcessEvaluator create( + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform, ?, ?> application, + DoFn fn, + List> sideInputs, + TupleTag mainOutputTag, + List> sideOutputTags, + Map, PCollection> outputs) { + InProcessExecutionContext executionContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()); + String stepName = evaluationContext.getStepName(application); + InProcessStepContext stepContext = + executionContext.getOrCreateStepContext(stepName, stepName); + + CounterSet counters = evaluationContext.createCounterSet(); + + Map, UncommittedBundle> outputBundles = new HashMap<>(); + for (Map.Entry, PCollection> outputEntry : outputs.entrySet()) { + outputBundles.put( + outputEntry.getKey(), + evaluationContext.createBundle(inputBundle, outputEntry.getValue())); + } + + DoFnRunner runner = + DoFnRunners.createDefault( + evaluationContext.getPipelineOptions(), + SerializableUtils.clone(fn), + evaluationContext.createSideInputReader(sideInputs), + BundleOutputManager.create(outputBundles), + mainOutputTag, + sideOutputTags, + stepContext, + counters.getAddCounterMutator(), + application.getInput().getWindowingStrategy()); + + try { + runner.startBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + + return new ParDoInProcessEvaluator<>( + runner, application, counters, outputBundles.values(), stepContext); + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + private final DoFnRunner fnRunner; + private final AppliedPTransform, ?, ?> transform; + private final CounterSet counters; + private final Collection> outputBundles; + private final InProcessStepContext stepContext; + + private ParDoInProcessEvaluator( + DoFnRunner fnRunner, + AppliedPTransform, ?, ?> transform, + CounterSet counters, + Collection> outputBundles, + InProcessStepContext stepContext) { + this.fnRunner = fnRunner; + this.transform = transform; + this.counters = counters; + this.outputBundles = outputBundles; + this.stepContext = stepContext; + } + + @Override + public void processElement(WindowedValue element) { + try { + fnRunner.processElement(element); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + } + + @Override + public InProcessTransformResult finishBundle() { + try { + fnRunner.finishBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } + StepTransformResult.Builder resultBuilder; + CopyOnAccessInMemoryStateInternals state = stepContext.commitState(); + if (state != null) { + resultBuilder = + StepTransformResult.withHold(transform, state.getEarliestWatermarkHold()) + .withState(state); + } else { + resultBuilder = StepTransformResult.withoutHold(transform); + } + return resultBuilder + .addOutput(outputBundles) + .withTimerUpdate(stepContext.getTimerUpdate()) + .withCounters(counters) + .build(); + } + + static class BundleOutputManager implements OutputManager { + private final Map, UncommittedBundle> bundles; + private final Map, List> undeclaredOutputs; + + public static BundleOutputManager create(Map, UncommittedBundle> outputBundles) { + return new BundleOutputManager(outputBundles); + } + + private BundleOutputManager(Map, UncommittedBundle> bundles) { + this.bundles = bundles; + undeclaredOutputs = new HashMap<>(); + } + + @SuppressWarnings("unchecked") + @Override + public void output(TupleTag tag, WindowedValue output) { + @SuppressWarnings("rawtypes") + UncommittedBundle bundle = bundles.get(tag); + if (bundle == null) { + List undeclaredContents = undeclaredOutputs.get(tag); + if (undeclaredContents == null) { + undeclaredContents = new ArrayList(); + undeclaredOutputs.put(tag, undeclaredContents); + } + undeclaredContents.add(output); + } else { + bundle.add(output); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java new file mode 100644 index 0000000..ae8ac6f --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.Map; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link BoundMulti} primitive {@link PTransform}. + */ +class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator evaluator = + createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); + return evaluator; + } + + private static ParDoInProcessEvaluator createMultiEvaluator( + AppliedPTransform, PCollectionTuple, BoundMulti> application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + Map, PCollection> outputs = application.getOutput().getAll(); + DoFn fn = application.getTransform().getFn(); + + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + fn, + application.getTransform().getSideInputs(), + application.getTransform().getMainOutputTag(), + application.getTransform().getSideOutputTags().getAll(), + outputs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java new file mode 100644 index 0000000..989ae51 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo.Bound; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.common.collect.ImmutableMap; + +import java.util.Collections; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link Bound ParDo.Bound} primitive {@link PTransform}. + */ +class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public TransformEvaluator forApplication( + final AppliedPTransform application, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator evaluator = + createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); + return evaluator; + } + + private static ParDoInProcessEvaluator createSingleEvaluator( + @SuppressWarnings("rawtypes") AppliedPTransform, PCollection, + Bound> application, + CommittedBundle inputBundle, InProcessEvaluationContext evaluationContext) { + TupleTag mainOutputTag = new TupleTag<>("out"); + + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + application.getTransform().getFn(), + application.getTransform().getSideInputs(), + mainOutputTag, + Collections.>emptyList(), + ImmutableMap., PCollection>of(mainOutputTag, application.getOutput())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java new file mode 100644 index 0000000..aef62b2 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; + +class PassthroughTransformEvaluator implements TransformEvaluator { + public static PassthroughTransformEvaluator create( + AppliedPTransform transform, UncommittedBundle output) { + return new PassthroughTransformEvaluator<>(transform, output); + } + + private final AppliedPTransform transform; + private final UncommittedBundle output; + + private PassthroughTransformEvaluator( + AppliedPTransform transform, UncommittedBundle output) { + this.transform = transform; + this.output = output; + } + + @Override + public void processElement(WindowedValue element) throws Exception { + output.add(element); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return StepTransformResult.withoutHold(transform).addOutput(output).build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java new file mode 100644 index 0000000..4687f85 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ShardControlledWrite.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Partition; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PDone; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * A write that explicitly controls its number of output shards. + */ +abstract class ShardControlledWrite + extends ForwardingPTransform, PDone> { + @Override + public PDone apply(PCollection input) { + int numShards = getNumShards(); + checkArgument( + numShards >= 1, + "%s should only be applied if the output has a controlled number of shards (> 1); got %s", + getClass().getSimpleName(), + getNumShards()); + PCollectionList shards = + input.apply( + "PartitionInto" + numShards + "Shards", + Partition.of(getNumShards(), new RandomSeedPartitionFn())); + for (int i = 0; i < shards.size(); i++) { + PCollection shard = shards.get(i); + PTransform, PDone> writeShard = getSingleShardTransform(i); + shard.apply(String.format("%s(Shard:%s)", writeShard.getName(), i), writeShard); + } + return PDone.in(input.getPipeline()); + } + + /** + * Returns the number of shards this {@link PTransform} should write to. + */ + abstract int getNumShards(); + + /** + * Returns a {@link PTransform} that performs a write to the shard with the specified shard + * number. + * + *

This method will be called n times, where n is the value of {@link #getNumShards()}, for + * shard numbers {@code [0...n)}. + */ + abstract PTransform, PDone> getSingleShardTransform(int shardNum); + + private static class RandomSeedPartitionFn implements Partition.PartitionFn { + int nextPartition = -1; + @Override + public int partitionFor(T elem, int numPartitions) { + if (nextPartition < 0) { + nextPartition = ThreadLocalRandom.current().nextInt(numPartitions); + } + nextPartition++; + nextPartition %= numPartitions; + return nextPartition; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java new file mode 100644 index 0000000..1c7cf6c --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.AppliedPTransform; + +import com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** + * A (Step, Key) pair. This is useful as a map key or cache key for things that are available + * per-step in a keyed manner (e.g. State). + */ +final class StepAndKey { + private final AppliedPTransform step; + private final Object key; + + /** + * Create a new {@link StepAndKey} with the provided step and key. + */ + public static StepAndKey of(AppliedPTransform step, Object key) { + return new StepAndKey(step, key); + } + + private StepAndKey(AppliedPTransform step, Object key) { + this.step = step; + this.key = key; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepAndKey.class) + .add("step", step.getFullName()) + .add("key", key) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(step, key); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } else if (!(other instanceof StepAndKey)) { + return false; + } else { + StepAndKey that = (StepAndKey) other; + return Objects.equals(this.step, that.step) + && Objects.equals(this.key, that.key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java new file mode 100644 index 0000000..46e7d04 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; + +import java.util.Collection; + +import javax.annotation.Nullable; + +/** + * An immutable {@link InProcessTransformResult}. + */ +public class StepTransformResult implements InProcessTransformResult { + private final AppliedPTransform transform; + private final Iterable> bundles; + @Nullable private final CopyOnAccessInMemoryStateInternals state; + private final TimerUpdate timerUpdate; + @Nullable private final CounterSet counters; + private final Instant watermarkHold; + + private StepTransformResult( + AppliedPTransform transform, + Iterable> outputBundles, + CopyOnAccessInMemoryStateInternals state, + TimerUpdate timerUpdate, + CounterSet counters, + Instant watermarkHold) { + this.transform = checkNotNull(transform); + this.bundles = checkNotNull(outputBundles); + this.state = state; + this.timerUpdate = checkNotNull(timerUpdate); + this.counters = counters; + this.watermarkHold = checkNotNull(watermarkHold); + } + + @Override + public Iterable> getOutputBundles() { + return bundles; + } + + @Override + public CounterSet getCounters() { + return counters; + } + + @Override + public AppliedPTransform getTransform() { + return transform; + } + + @Override + public Instant getWatermarkHold() { + return watermarkHold; + } + + @Nullable + @Override + public CopyOnAccessInMemoryStateInternals getState() { + return state; + } + + @Override + public TimerUpdate getTimerUpdate() { + return timerUpdate; + } + + public static Builder withHold(AppliedPTransform transform, Instant watermarkHold) { + return new Builder(transform, watermarkHold); + } + + public static Builder withoutHold(AppliedPTransform transform) { + return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepTransformResult.class) + .add("transform", transform) + .toString(); + } + + /** + * A builder for creating instances of {@link StepTransformResult}. + */ + public static class Builder { + private final AppliedPTransform transform; + private final ImmutableList.Builder> bundlesBuilder; + private CopyOnAccessInMemoryStateInternals state; + private TimerUpdate timerUpdate; + private CounterSet counters; + private final Instant watermarkHold; + + private Builder(AppliedPTransform transform, Instant watermarkHold) { + this.transform = transform; + this.watermarkHold = watermarkHold; + this.bundlesBuilder = ImmutableList.builder(); + this.timerUpdate = TimerUpdate.builder(null).build(); + } + + public StepTransformResult build() { + return new StepTransformResult( + transform, + bundlesBuilder.build(), + state, + timerUpdate, + counters, + watermarkHold); + } + + public Builder withCounters(CounterSet counters) { + this.counters = counters; + return this; + } + + public Builder withState(CopyOnAccessInMemoryStateInternals state) { + this.state = state; + return this; + } + + public Builder withTimerUpdate(TimerUpdate timerUpdate) { + this.timerUpdate = timerUpdate; + return this; + } + + public Builder addOutput( + UncommittedBundle outputBundle, UncommittedBundle... outputBundles) { + bundlesBuilder.add(outputBundle); + bundlesBuilder.add(outputBundles); + return this; + } + + public Builder addOutput(Collection> outputBundles) { + bundlesBuilder.addAll(outputBundles); + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java new file mode 100644 index 0000000..be1bf18 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TextIOShardedWriteFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.TextIO.Write.Bound; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +class TextIOShardedWriteFactory implements PTransformOverrideFactory { + + @Override + public PTransform override( + PTransform transform) { + if (transform instanceof TextIO.Write.Bound) { + @SuppressWarnings("unchecked") + TextIO.Write.Bound originalWrite = (TextIO.Write.Bound) transform; + if (originalWrite.getNumShards() > 1 + || (originalWrite.getNumShards() == 1 + && !"".equals(originalWrite.getShardNameTemplate()))) { + @SuppressWarnings("unchecked") + PTransform override = + (PTransform) new TextIOShardedWrite(originalWrite); + return override; + } + } + return transform; + } + + private static class TextIOShardedWrite extends ShardControlledWrite { + private final TextIO.Write.Bound initial; + + private TextIOShardedWrite(Bound initial) { + this.initial = initial; + } + + @Override + int getNumShards() { + return initial.getNumShards(); + } + + @Override + PTransform, PDone> getSingleShardTransform(int shardNum) { + String shardName = + IOChannelUtils.constructName( + initial.getFilenamePrefix(), + initial.getShardTemplate(), + initial.getFilenameSuffix(), + shardNum, + getNumShards()); + return TextIO.Write.withCoder(initial.getCoder()).to(shardName).withoutSharding(); + } + + @Override + protected PTransform, PDone> delegate() { + return initial; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java new file mode 100644 index 0000000..ba9815b --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * An evaluator of a specific application of a transform. Will be used for at least one + * {@link CommittedBundle}. + * + * @param the type of elements that will be passed to {@link #processElement} + */ +public interface TransformEvaluator { + /** + * Process an element in the input {@link CommittedBundle}. + * + * @param element the element to process + */ + void processElement(WindowedValue element) throws Exception; + + /** + * Finish processing the bundle of this {@link TransformEvaluator}. + * + * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused, + * and no more elements will be processed. + * + * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation. + */ + InProcessTransformResult finishBundle() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java new file mode 100644 index 0000000..8f8d84c --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; + +import javax.annotation.Nullable; + +/** + * A factory for creating instances of {@link TransformEvaluator} for the application of a + * {@link PTransform}. + */ +public interface TransformEvaluatorFactory { + /** + * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}. + * + * Any work that must be done before input elements are processed (such as calling + * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is + * made available to the caller. + * + * @throws Exception whenever constructing the underlying evaluator throws an exception + */ + TransformEvaluator forApplication( + AppliedPTransform application, @Nullable CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java new file mode 100644 index 0000000..f449731 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} + * implementations based on the type of {@link PTransform} of the application. + */ +class TransformEvaluatorRegistry implements TransformEvaluatorFactory { + public static TransformEvaluatorRegistry defaultRegistry() { + @SuppressWarnings("rawtypes") + ImmutableMap, TransformEvaluatorFactory> primitives = + ImmutableMap., TransformEvaluatorFactory>builder() + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) + .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) + .put( + GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, + new GroupByKeyEvaluatorFactory()) + .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) + .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) + .put(Window.Bound.class, new WindowEvaluatorFactory()) + .build(); + return new TransformEvaluatorRegistry(primitives); + } + + // the TransformEvaluatorFactories can construct instances of all generic types of transform, + // so all instances of a primitive can be handled with the same evaluator factory. + @SuppressWarnings("rawtypes") + private final Map, TransformEvaluatorFactory> factories; + + private TransformEvaluatorRegistry( + @SuppressWarnings("rawtypes") + Map, TransformEvaluatorFactory> factories) { + this.factories = factories; + } + + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + @Nullable CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); + return factory.forApplication(application, inputBundle, evaluationContext); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java new file mode 100644 index 0000000..8346e89 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; + +import com.google.common.base.Throwables; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Nullable; + +/** + * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a + * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering + * the result using a registered {@link CompletionCallback}. + * + *

A {@link TransformExecutor} that is currently executing also provides access to the thread + * that it is being executed on. + */ +class TransformExecutor implements Callable { + public static TransformExecutor create( + TransformEvaluatorFactory factory, + Iterable modelEnforcements, + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + return new TransformExecutor<>( + factory, + modelEnforcements, + evaluationContext, + inputBundle, + transform, + completionCallback, + transformEvaluationState); + } + + private final TransformEvaluatorFactory evaluatorFactory; + private final Iterable modelEnforcements; + + private final InProcessEvaluationContext evaluationContext; + + /** The transform that will be evaluated. */ + private final AppliedPTransform transform; + /** The inputs this {@link TransformExecutor} will deliver to the transform. */ + private final CommittedBundle inputBundle; + + private final CompletionCallback onComplete; + private final TransformExecutorService transformEvaluationState; + + private final AtomicReference thread; + + private TransformExecutor( + TransformEvaluatorFactory factory, + Iterable modelEnforcements, + InProcessEvaluationContext evaluationContext, + CommittedBundle inputBundle, + AppliedPTransform transform, + CompletionCallback completionCallback, + TransformExecutorService transformEvaluationState) { + this.evaluatorFactory = factory; + this.modelEnforcements = modelEnforcements; + this.evaluationContext = evaluationContext; + + this.inputBundle = inputBundle; + this.transform = transform; + + this.onComplete = completionCallback; + + this.transformEvaluationState = transformEvaluationState; + this.thread = new AtomicReference<>(); + } + + @Override + public InProcessTransformResult call() { + checkState( + thread.compareAndSet(null, Thread.currentThread()), + "Tried to execute %s for %s on thread %s, but is already executing on thread %s", + TransformExecutor.class.getSimpleName(), + transform.getFullName(), + Thread.currentThread(), + thread.get()); + try { + Collection> enforcements = new ArrayList<>(); + for (ModelEnforcementFactory enforcementFactory : modelEnforcements) { + ModelEnforcement enforcement = enforcementFactory.forBundle(inputBundle, transform); + enforcements.add(enforcement); + } + TransformEvaluator evaluator = + evaluatorFactory.forApplication(transform, inputBundle, evaluationContext); + + processElements(evaluator, enforcements); + + InProcessTransformResult result = finishBundle(evaluator, enforcements); + return result; + } catch (Throwable t) { + onComplete.handleThrowable(inputBundle, t); + throw Throwables.propagate(t); + } finally { + transformEvaluationState.complete(this); + } + } + + /** + * Processes all the elements in the input bundle using the transform evaluator, applying any + * necessary {@link ModelEnforcement ModelEnforcements}. + */ + private void processElements( + TransformEvaluator evaluator, Collection> enforcements) + throws Exception { + if (inputBundle != null) { + for (WindowedValue value : inputBundle.getElements()) { + for (ModelEnforcement enforcement : enforcements) { + enforcement.beforeElement(value); + } + + evaluator.processElement(value); + + for (ModelEnforcement enforcement : enforcements) { + enforcement.afterElement(value); + } + } + } + } + + /** + * Finishes processing the input bundle and commit the result using the + * {@link CompletionCallback}, applying any {@link ModelEnforcement} if necessary. + * + * @return the {@link InProcessTransformResult} produced by + * {@link TransformEvaluator#finishBundle()} + */ + private InProcessTransformResult finishBundle( + TransformEvaluator evaluator, Collection> enforcements) + throws Exception { + InProcessTransformResult result = evaluator.finishBundle(); + CommittedResult outputs = onComplete.handleResult(inputBundle, result); + for (ModelEnforcement enforcement : enforcements) { + enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); + } + return result; + } + + /** + * If this {@link TransformExecutor} is currently executing, return the thread it is executing in. + * Otherwise, return null. + */ + @Nullable + public Thread getThread() { + return thread.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java new file mode 100644 index 0000000..837b858 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +/** + * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as + * appropriate for the {@link StepAndKey} the executor exists for. + */ +interface TransformExecutorService { + /** + * Schedule the provided work to be eventually executed. + */ + void schedule(TransformExecutor work); + + /** + * Finish executing the provided work. This may cause additional + * {@link TransformExecutor TransformExecutors} to be evaluated. + */ + void complete(TransformExecutor completed); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java new file mode 100644 index 0000000..087b7c2 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import com.google.common.base.MoreObjects; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Static factory methods for constructing instances of {@link TransformExecutorService}. + */ +final class TransformExecutorServices { + private TransformExecutorServices() { + // Do not instantiate + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * parallel. + */ + public static TransformExecutorService parallel( + ExecutorService executor, Map, Boolean> scheduled) { + return new ParallelEvaluationState(executor, scheduled); + } + + /** + * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in + * serial. + */ + public static TransformExecutorService serial( + ExecutorService executor, Map, Boolean> scheduled) { + return new SerialEvaluationState(executor, scheduled); + } + + /** + * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor} + * scheduled will be immediately submitted to the {@link ExecutorService}. + * + *

A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations are + * processed in parallel. + */ + private static class ParallelEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map, Boolean> scheduled; + + private ParallelEvaluationState( + ExecutorService executor, Map, Boolean> scheduled) { + this.executor = executor; + this.scheduled = scheduled; + } + + @Override + public void schedule(TransformExecutor work) { + executor.submit(work); + scheduled.put(work, true); + } + + @Override + public void complete(TransformExecutor completed) { + scheduled.remove(completed); + } + } + + /** + * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor} + * scheduled will be placed on the work queue. Only one item of work will be submitted to the + * {@link ExecutorService} at any time. + * + *

A principal use of this is for the serial evaluation of a (Step, Key) pair. + * Keyed computations are processed serially per step. + */ + private static class SerialEvaluationState implements TransformExecutorService { + private final ExecutorService executor; + private final Map, Boolean> scheduled; + + private AtomicReference> currentlyEvaluating; + private final Queue> workQueue; + + private SerialEvaluationState( + ExecutorService executor, Map, Boolean> scheduled) { + this.scheduled = scheduled; + this.executor = executor; + this.currentlyEvaluating = new AtomicReference<>(); + this.workQueue = new ConcurrentLinkedQueue<>(); + } + + /** + * Schedules the work, adding it to the work queue if there is a bundle currently being + * evaluated and scheduling it immediately otherwise. + */ + @Override + public void schedule(TransformExecutor work) { + workQueue.offer(work); + updateCurrentlyEvaluating(); + } + + @Override + public void complete(TransformExecutor completed) { + if (!currentlyEvaluating.compareAndSet(completed, null)) { + throw new IllegalStateException( + "Finished work " + + completed + + " but could not complete due to unexpected currently executing " + + currentlyEvaluating.get()); + } + scheduled.remove(completed); + updateCurrentlyEvaluating(); + } + + private void updateCurrentlyEvaluating() { + if (currentlyEvaluating.get() == null) { + // Only synchronize if we need to update what's currently evaluating + synchronized (this) { + TransformExecutor newWork = workQueue.poll(); + if (newWork != null) { + if (currentlyEvaluating.compareAndSet(null, newWork)) { + scheduled.put(newWork, true); + executor.submit(newWork); + } else { + workQueue.offer(newWork); + } + } + } + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(SerialEvaluationState.class) + .add("currentlyEvaluating", currentlyEvaluating) + .add("workQueue", workQueue) + .toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java new file mode 100644 index 0000000..7a95c9f --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} + * for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}. + */ +class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { + /* + * An evaluator for a Source is stateful, to ensure the CheckpointMark is properly persisted. + * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused + * and any splits are honored. + */ + private final ConcurrentMap>> + sourceEvaluators = new ConcurrentHashMap<>(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public TransformEvaluator forApplication(AppliedPTransform application, + @Nullable CommittedBundle inputBundle, InProcessEvaluationContext evaluationContext) { + return getTransformEvaluator((AppliedPTransform) application, evaluationContext); + } + + private TransformEvaluator getTransformEvaluator( + final AppliedPTransform, Unbounded> transform, + final InProcessEvaluationContext evaluationContext) { + UnboundedReadEvaluator currentEvaluator = + getTransformEvaluatorQueue(transform, evaluationContext).poll(); + if (currentEvaluator == null) { + return EmptyTransformEvaluator.create(transform); + } + return currentEvaluator; + } + + /** + * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements for the + * provided application of {@link Unbounded Read.Unbounded}, initializing it if required. + * + *

This method is thread-safe, and will only produce new evaluators if no other invocation has + * already done so. + */ + @SuppressWarnings("unchecked") + private Queue> getTransformEvaluatorQueue( + final AppliedPTransform, Unbounded> transform, + final InProcessEvaluationContext evaluationContext) { + // Key by the application and the context the evaluation is occurring in (which call to + // Pipeline#run). + EvaluatorKey key = new EvaluatorKey(transform, evaluationContext); + @SuppressWarnings("unchecked") + Queue> evaluatorQueue = + (Queue>) sourceEvaluators.get(key); + if (evaluatorQueue == null) { + evaluatorQueue = new ConcurrentLinkedQueue<>(); + if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) { + // If no queue existed in the evaluators, add an evaluator to initialize the evaluator + // factory for this transform + UnboundedSource source = transform.getTransform().getSource(); + UnboundedReadEvaluator evaluator = + new UnboundedReadEvaluator( + transform, evaluationContext, source, evaluatorQueue); + evaluatorQueue.offer(evaluator); + } else { + // otherwise return the existing Queue that arrived before us + evaluatorQueue = (Queue>) sourceEvaluators.get(key); + } + } + return evaluatorQueue; + } + + /** + * A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource}, + * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator + * creates the {@link UnboundedReader} and consumes some currently available input. + * + *

Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be + * used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own + * checkpoint, and constructs its reader from the current checkpoint in each call to + * {@link #finishBundle()}. + */ + private static class UnboundedReadEvaluator implements TransformEvaluator { + private static final int ARBITRARY_MAX_ELEMENTS = 10; + private final AppliedPTransform, Unbounded> transform; + private final InProcessEvaluationContext evaluationContext; + private final Queue> evaluatorQueue; + /** + * The source being read from by this {@link UnboundedReadEvaluator}. This may not be the same + * source as derived from {@link #transform} due to splitting. + */ + private final UnboundedSource source; + private CheckpointMark checkpointMark; + + public UnboundedReadEvaluator( + AppliedPTransform, Unbounded> transform, + InProcessEvaluationContext evaluationContext, + UnboundedSource source, + Queue> evaluatorQueue) { + this.transform = transform; + this.evaluationContext = evaluationContext; + this.evaluatorQueue = evaluatorQueue; + this.source = source; + this.checkpointMark = null; + } + + @Override + public void processElement(WindowedValue element) {} + + @Override + public InProcessTransformResult finishBundle() throws IOException { + UncommittedBundle output = evaluationContext.createRootBundle(transform.getOutput()); + try (UnboundedReader reader = + createReader(source, evaluationContext.getPipelineOptions());) { + int numElements = 0; + if (reader.start()) { + do { + output.add( + WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp())); + numElements++; + } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); + } + checkpointMark = reader.getCheckpointMark(); + checkpointMark.finalizeCheckpoint(); + // TODO: When exercising create initial splits, make this the minimum watermark across all + // existing readers + StepTransformResult result = + StepTransformResult.withHold(transform, reader.getWatermark()) + .addOutput(output) + .build(); + evaluatorQueue.offer(this); + return result; + } + } + + private UnboundedReader createReader( + UnboundedSource source, PipelineOptions options) { + @SuppressWarnings("unchecked") + CheckpointMarkT mark = (CheckpointMarkT) checkpointMark; + return source.createReader(options, mark); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java new file mode 100644 index 0000000..ffaf3fa --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import java.util.ArrayList; +import java.util.List; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link CreatePCollectionView} primitive {@link PTransform}. + * + *

The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for + * the {@link WriteView} {@link PTransform}, which is part of the + * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the + * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is + * written. + */ +class ViewEvaluatorFactory implements TransformEvaluatorFactory { + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + InProcessPipelineRunner.CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + @SuppressWarnings({"cast", "unchecked", "rawtypes"}) + TransformEvaluator evaluator = createEvaluator( + (AppliedPTransform) application, evaluationContext); + return evaluator; + } + + private TransformEvaluator> createEvaluator( + final AppliedPTransform>, PCollectionView, WriteView> + application, + InProcessEvaluationContext context) { + PCollection> input = application.getInput(); + final PCollectionViewWriter writer = + context.createPCollectionViewWriter(input, application.getOutput()); + return new TransformEvaluator>() { + private final List> elements = new ArrayList<>(); + + @Override + public void processElement(WindowedValue> element) { + for (InT input : element.getValue()) { + elements.add(element.withValue(input)); + } + } + + @Override + public InProcessTransformResult finishBundle() { + writer.add(elements); + return StepTransformResult.withoutHold(application).build(); + } + }; + } + + public static class InProcessViewOverrideFactory implements PTransformOverrideFactory { + @Override + public + PTransform override(PTransform transform) { + if (transform instanceof CreatePCollectionView) { + + } + @SuppressWarnings({"rawtypes", "unchecked"}) + PTransform createView = + (PTransform) + new InProcessCreatePCollectionView<>((CreatePCollectionView) transform); + return createView; + } + } + + /** + * An in-process override for {@link CreatePCollectionView}. + */ + private static class InProcessCreatePCollectionView + extends ForwardingPTransform, PCollectionView> { + private final CreatePCollectionView og; + + private InProcessCreatePCollectionView(CreatePCollectionView og) { + this.og = og; + } + + @Override + public PCollectionView apply(PCollection input) { + return input.apply(WithKeys.of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) + .apply(GroupByKey.create()) + .apply(Values.>create()) + .apply(new WriteView(og)); + } + + @Override + protected PTransform, PCollectionView> delegate() { + return og; + } + } + + /** + * An in-process implementation of the {@link CreatePCollectionView} primitive. + * + * This implementation requires the input {@link PCollection} to be an iterable, which is provided + * to {@link PCollectionView#fromIterableInternal(Iterable)}. + */ + public static final class WriteView + extends PTransform>, PCollectionView> { + private final CreatePCollectionView og; + + WriteView(CreatePCollectionView og) { + this.og = og; + } + + @Override + public PCollectionView apply(PCollection> input) { + return og.getView(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java new file mode 100644 index 0000000..4a3a517 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowingStrategy; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Ordering; + +import org.joda.time.Instant; + +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Executes callbacks that occur based on the progression of the watermark per-step. + * + *

Callbacks are registered by calls to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}, + * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the + * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the + * windowing strategy would have been produced. + * + *

NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any + * {@link AppliedPTransform} - any call to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)} + * that could have potentially already fired should be followed by a call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current + * value of the watermark. + */ +class WatermarkCallbackExecutor { + /** + * Create a new {@link WatermarkCallbackExecutor}. + */ + public static WatermarkCallbackExecutor create() { + return new WatermarkCallbackExecutor(); + } + + private final ConcurrentMap, PriorityQueue> + callbacks; + private final ExecutorService executor; + + private WatermarkCallbackExecutor() { + this.callbacks = new ConcurrentHashMap<>(); + this.executor = Executors.newSingleThreadExecutor(); + } + + /** + * Execute the provided {@link Runnable} after the next call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have + * produced output. + */ + public void callOnGuaranteedFiring( + AppliedPTransform step, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + WatermarkCallback callback = + WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); + + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + callbackQueue = new PriorityQueue<>(11, new CallbackOrdering()); + if (callbacks.putIfAbsent(step, callbackQueue) != null) { + callbackQueue = callbacks.get(step); + } + } + + synchronized (callbackQueue) { + callbackQueue.offer(callback); + } + } + + /** + * Schedule all pending callbacks that must have produced output by the time of the provided + * watermark. + */ + public void fireForWatermark(AppliedPTransform step, Instant watermark) { + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + return; + } + synchronized (callbackQueue) { + while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { + executor.submit(callbackQueue.poll().getCallback()); + } + } + } + + private static class WatermarkCallback { + public static WatermarkCallback onGuaranteedFiring( + BoundedWindow window, WindowingStrategy strategy, Runnable callback) { + @SuppressWarnings("unchecked") + Instant firingAfter = + strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); + return new WatermarkCallback(firingAfter, callback); + } + + private final Instant fireAfter; + private final Runnable callback; + + private WatermarkCallback(Instant fireAfter, Runnable callback) { + this.fireAfter = fireAfter; + this.callback = callback; + } + + public boolean shouldFire(Instant currentWatermark) { + return currentWatermark.isAfter(fireAfter) + || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + public Runnable getCallback() { + return callback; + } + } + + private static class CallbackOrdering extends Ordering { + @Override + public int compare(WatermarkCallback left, WatermarkCallback right) { + return ComparisonChain.start() + .compare(left.fireAfter, right.fireAfter) + .compare(left.callback, right.callback, Ordering.arbitrary()) + .result(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java new file mode 100644 index 0000000..628f94d --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Bound; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import org.joda.time.Instant; + +import java.util.Collection; + +import javax.annotation.Nullable; + +/** + * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the + * {@link Bound Window.Bound} primitive {@link PTransform}. + */ +class WindowEvaluatorFactory implements TransformEvaluatorFactory { + + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + @Nullable CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + return createTransformEvaluator( + (AppliedPTransform) application, inputBundle, evaluationContext); + } + + private TransformEvaluator createTransformEvaluator( + AppliedPTransform, PCollection, Window.Bound> transform, + CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) { + WindowFn fn = transform.getTransform().getWindowFn(); + UncommittedBundle outputBundle = + evaluationContext.createBundle(inputBundle, transform.getOutput()); + if (fn == null) { + return PassthroughTransformEvaluator.create(transform, outputBundle); + } + return new WindowIntoEvaluator<>(transform, fn, outputBundle); + } + + private static class WindowIntoEvaluator implements TransformEvaluator { + private final AppliedPTransform, PCollection, Window.Bound> + transform; + private final WindowFn windowFn; + private final UncommittedBundle outputBundle; + + @SuppressWarnings("unchecked") + public WindowIntoEvaluator( + AppliedPTransform, PCollection, Window.Bound> transform, + WindowFn windowFn, + UncommittedBundle outputBundle) { + this.outputBundle = outputBundle; + this.transform = transform; + // Safe contravariant cast + this.windowFn = (WindowFn) windowFn; + } + + @Override + public void processElement(WindowedValue element) throws Exception { + Collection windows = assignWindows(windowFn, element); + outputBundle.add( + WindowedValue.of( + element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING)); + } + + private Collection assignWindows( + WindowFn windowFn, WindowedValue element) throws Exception { + WindowFn.AssignContext assignContext = + new InProcessAssignContext<>(windowFn, element); + Collection windows = windowFn.assignWindows(assignContext); + return windows; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return StepTransformResult.withoutHold(transform).addOutput(outputBundle).build(); + } + } + + private static class InProcessAssignContext + extends WindowFn.AssignContext { + private final WindowedValue value; + + public InProcessAssignContext(WindowFn fn, WindowedValue value) { + fn.super(); + this.value = value; + } + + @Override + public InputT element() { + return value.getValue(); + } + + @Override + public Instant timestamp() { + return value.getTimestamp(); + } + + @Override + public Collection windows() { + return value.getWindows(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java new file mode 100644 index 0000000..d290a4b --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/AvroIOShardedWriteFactoryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.AvroIOTest; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; + +/** + * Tests for {@link AvroIOShardedWriteFactory}. + */ +@RunWith(JUnit4.class) +public class AvroIOShardedWriteFactoryTest { + + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + private AvroIOShardedWriteFactory factory; + + @Before + public void setup() { + factory = new AvroIOShardedWriteFactory(); + } + + @Test + public void originalWithoutShardingReturnsOriginal() throws Exception { + File file = tmp.newFile("foo"); + PTransform, PDone> original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withoutSharding(); + PTransform, PDone> overridden = factory.override(original); + + assertThat(overridden, theInstance(original)); + } + + @Test + public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { + File file = tmp.newFile("foo"); + PTransform, PDone> original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()); + PTransform, PDone> overridden = factory.override(original); + + assertThat(overridden, theInstance(original)); + } + + @Test + public void originalShardedToOneReturnsExplicitlySharded() throws Exception { + File file = tmp.newFile("foo"); + AvroIO.Write.Bound original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(1); + PTransform, PDone> overridden = factory.override(original); + + assertThat(overridden, not(Matchers., PDone>>equalTo(original))); + + TestPipeline p = TestPipeline.create(); + String[] elems = new String[] {"foo", "bar", "baz"}; + p.apply(Create.of(elems)).apply(overridden); + + file.delete(); + + p.run(); + AvroIOTest.assertTestOutputs(elems, 1, file.getAbsolutePath(), original.getShardNameTemplate()); + } + + @Test + public void originalShardedToManyReturnsExplicitlySharded() throws Exception { + File file = tmp.newFile("foo"); + AvroIO.Write.Bound original = + AvroIO.Write.withSchema(String.class).to(file.getAbsolutePath()).withNumShards(3); + PTransform, PDone> overridden = factory.override(original); + + assertThat(overridden, not(Matchers., PDone>>equalTo(original))); + + TestPipeline p = TestPipeline.create(); + String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; + p.apply(Create.of(elems)).apply(overridden); + + file.delete(); + p.run(); + AvroIOTest.assertTestOutputs(elems, 3, file.getAbsolutePath(), original.getShardNameTemplate()); + } +}