Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1150119322 for ; Fri, 29 Apr 2016 21:56:22 +0000 (UTC) Received: (qmail 25259 invoked by uid 500); 29 Apr 2016 21:56:21 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 25210 invoked by uid 500); 29 Apr 2016 21:56:21 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 25201 invoked by uid 99); 29 Apr 2016 21:56:21 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 21:56:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2E4BA180572 for ; Fri, 29 Apr 2016 21:56:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.211 X-Spam-Level: X-Spam-Status: No, score=-3.211 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id wx2f8CXgX1RJ for ; Fri, 29 Apr 2016 21:56:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 9213B60DB3 for ; Fri, 29 Apr 2016 21:56:01 +0000 (UTC) Received: (qmail 23331 invoked by uid 99); 29 Apr 2016 21:55:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 21:55:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2E0BDFFB9; Fri, 29 Apr 2016 21:55:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Fri, 29 Apr 2016 21:56:12 -0000 Message-Id: <5285359025374de78cfe9243d4803e40@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/17] incubator-beam git commit: Move InProcessRunner to its own module http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java new file mode 100644 index 0000000..f374f99 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +import java.util.Map; + +/** + * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the + * {@link InProcessPipelineRunner}. + */ +public class InProcessBundleOutputManager implements OutputManager { + private final Map, UncommittedBundle> bundles; + + public static InProcessBundleOutputManager create( + Map, UncommittedBundle> outputBundles) { + return new InProcessBundleOutputManager(outputBundles); + } + + public InProcessBundleOutputManager(Map, UncommittedBundle> bundles) { + this.bundles = bundles; + } + + @SuppressWarnings("unchecked") + @Override + public void output(TupleTag tag, WindowedValue output) { + @SuppressWarnings("rawtypes") + UncommittedBundle bundle = bundles.get(tag); + bundle.add(output); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java new file mode 100644 index 0000000..d9a7ff0 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +/** + * The evaluation context for a specific pipeline being executed by the + * {@link InProcessPipelineRunner}. Contains state shared within the execution across all + * transforms. + * + *

{@link InProcessEvaluationContext} contains shared state for an execution of the + * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This + * consists of views into underlying state and watermark implementations, access to read and write + * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and + * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when + * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and + * known to be empty). + * + *

{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based + * on the current global state and updating the global state appropriately. This includes updating + * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that + * can be executed. + */ +class InProcessEvaluationContext { + /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */ + private final Map, String> stepNames; + + /** The options that were used to create this {@link Pipeline}. */ + private final InProcessPipelineOptions options; + + private final BundleFactory bundleFactory; + /** The current processing time and event time watermarks and timers. */ + private final InMemoryWatermarkManager watermarkManager; + + /** Executes callbacks based on the progression of the watermark. */ + private final WatermarkCallbackExecutor callbackExecutor; + + /** The stateInternals of the world, by applied PTransform and key. */ + private final ConcurrentMap> + applicationStateInternals; + + private final InProcessSideInputContainer sideInputContainer; + + private final CounterSet mergedCounters; + + public static InProcessEvaluationContext create( + InProcessPipelineOptions options, + BundleFactory bundleFactory, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, + Collection> views) { + return new InProcessEvaluationContext( + options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views); + } + + private InProcessEvaluationContext( + InProcessPipelineOptions options, + BundleFactory bundleFactory, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, + Collection> views) { + this.options = checkNotNull(options); + this.bundleFactory = checkNotNull(bundleFactory); + checkNotNull(rootTransforms); + checkNotNull(valueToConsumers); + checkNotNull(stepNames); + checkNotNull(views); + this.stepNames = stepNames; + + this.watermarkManager = + InMemoryWatermarkManager.create( + NanosOffsetClock.create(), rootTransforms, valueToConsumers); + this.sideInputContainer = InProcessSideInputContainer.create(this, views); + + this.applicationStateInternals = new ConcurrentHashMap<>(); + this.mergedCounters = new CounterSet(); + + this.callbackExecutor = WatermarkCallbackExecutor.create(); + } + + /** + * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided + * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}). + * + *

The result is the output of running the transform contained in the + * {@link InProcessTransformResult} on the contents of the provided bundle. + * + * @param completedBundle the bundle that was processed to produce the result. Potentially + * {@code null} if the transform that produced the result is a root + * transform + * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, + * or an empty iterable if no timers were delivered + * @param result the result of evaluating the input bundle + * @return the committed bundles contained within the handled {@code result} + */ + public synchronized CommittedResult handleResult( + @Nullable CommittedBundle completedBundle, + Iterable completedTimers, + InProcessTransformResult result) { + Iterable> committedBundles = + commitBundles(result.getOutputBundles()); + // Update watermarks and timers + watermarkManager.updateWatermarks( + completedBundle, + result.getTransform(), + result.getTimerUpdate().withCompletedTimers(completedTimers), + committedBundles, + result.getWatermarkHold()); + fireAllAvailableCallbacks(); + // Update counters + if (result.getCounters() != null) { + mergedCounters.merge(result.getCounters()); + } + // Update state internals + CopyOnAccessInMemoryStateInternals theirState = result.getState(); + if (theirState != null) { + CopyOnAccessInMemoryStateInternals committedState = theirState.commit(); + StepAndKey stepAndKey = + StepAndKey.of( + result.getTransform(), completedBundle == null ? null : completedBundle.getKey()); + if (!committedState.isEmpty()) { + applicationStateInternals.put(stepAndKey, committedState); + } else { + applicationStateInternals.remove(stepAndKey); + } + } + return CommittedResult.create(result, committedBundles); + } + + private Iterable> commitBundles( + Iterable> bundles) { + ImmutableList.Builder> completed = ImmutableList.builder(); + for (UncommittedBundle inProgress : bundles) { + AppliedPTransform producing = + inProgress.getPCollection().getProducingTransformInternal(); + TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); + CommittedBundle committed = + inProgress.commit(watermarks.getSynchronizedProcessingOutputTime()); + // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so + // filter them out + if (!Iterables.isEmpty(committed.getElements())) { + completed.add(committed); + } + } + return completed.build(); + } + + private void fireAllAvailableCallbacks() { + for (AppliedPTransform transform : stepNames.keySet()) { + fireAvailableCallbacks(transform); + } + } + + private void fireAvailableCallbacks(AppliedPTransform producingTransform) { + TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); + callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark()); + } + + /** + * Create a {@link UncommittedBundle} for use by a source. + */ + public UncommittedBundle createRootBundle(PCollection output) { + return bundleFactory.createRootBundle(output); + } + + /** + * Create a {@link UncommittedBundle} whose elements belong to the specified {@link + * PCollection}. + */ + public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { + return bundleFactory.createBundle(input, output); + } + + /** + * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. + */ + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output) { + return bundleFactory.createKeyedBundle(input, key, output); + } + + /** + * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided + * {@link PCollectionView}. + */ + public PCollectionViewWriter createPCollectionViewWriter( + PCollection> input, final PCollectionView output) { + return new PCollectionViewWriter() { + @Override + public void add(Iterable> values) { + sideInputContainer.write(output, values); + } + }; + } + + /** + * Schedule a callback to be executed after output would be produced for the given window + * if there had been input. + * + *

Output would be produced when the watermark for a {@link PValue} passes the point at + * which the trigger for the specified window (with the specified windowing strategy) must have + * fired from the perspective of that {@link PValue}, as specified by the value of + * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the + * {@link WindowingStrategy}. When the callback has fired, either values will have been produced + * for a key in that window, the window is empty, or all elements in the window are late. The + * callback will be executed regardless of whether values have been produced. + */ + public void scheduleAfterOutputWouldBeProduced( + PValue value, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + AppliedPTransform producing = getProducing(value); + callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); + + fireAvailableCallbacks(lookupProducing(value)); + } + + private AppliedPTransform getProducing(PValue value) { + if (value.getProducingTransformInternal() != null) { + return value.getProducingTransformInternal(); + } + return lookupProducing(value); + } + + private AppliedPTransform lookupProducing(PValue value) { + for (AppliedPTransform transform : stepNames.keySet()) { + if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) { + return transform; + } + } + return null; + } + + /** + * Get the options used by this {@link Pipeline}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. + */ + public InProcessExecutionContext getExecutionContext( + AppliedPTransform application, Object key) { + StepAndKey stepAndKey = StepAndKey.of(application, key); + return new InProcessExecutionContext( + options.getClock(), + key, + (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey), + watermarkManager.getWatermarks(application)); + } + + /** + * Get all of the steps used in this {@link Pipeline}. + */ + public Collection> getSteps() { + return stepNames.keySet(); + } + + /** + * Get the Step Name for the provided application. + */ + public String getStepName(AppliedPTransform application) { + return stepNames.get(application); + } + + /** + * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided + * {@link PCollectionView PCollectionViews}. + * + * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to + * read + * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView + * PCollectionViews} + */ + public ReadyCheckingSideInputReader createSideInputReader( + final List> sideInputs) { + return sideInputContainer.createReaderForViews(sideInputs); + } + + /** + * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has + * had its contents set in a window. + */ + static interface ReadyCheckingSideInputReader extends SideInputReader { + /** + * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}. + */ + boolean isReady(PCollectionView view, BoundedWindow window); + } + + /** + * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent + * of all other {@link CounterSet CounterSets} created by this call. + * + * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in + * all created {@link CounterSet CounterSets} when the transforms that call this method + * complete. + */ + public CounterSet createCounterSet() { + return new CounterSet(); + } + + /** + * Returns all of the counters that have been merged into this context via calls to + * {@link CounterSet#merge(CounterSet)}. + */ + public CounterSet getCounters() { + return mergedCounters; + } + + /** + * Extracts all timers that have been fired and have not already been extracted. + * + *

This is a destructive operation. Timers will only appear in the result of this method once + * for each time they are set. + */ + public Map, Map> extractFiredTimers() { + Map, Map> fired = + watermarkManager.extractFiredTimers(); + return fired; + } + + /** + * Returns true if the step will not produce additional output. + * + *

If the provided transform produces only {@link IsBounded#BOUNDED} + * {@link PCollection PCollections}, returns true if the watermark is at + * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}. + * + *

If the provided transform produces any {@link IsBounded#UNBOUNDED} + * {@link PCollection PCollections}, returns the value of + * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}. + */ + public boolean isDone(AppliedPTransform transform) { + // if the PTransform's watermark isn't at the max value, it isn't done + if (watermarkManager + .getWatermarks(transform) + .getOutputWatermark() + .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + return false; + } + // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down, + // the PTransform may produce additional output. It is not done. + for (PValue output : transform.getOutput().expand()) { + if (output instanceof PCollection) { + IsBounded bounded = ((PCollection) output).isBounded(); + if (bounded.equals(IsBounded.UNBOUNDED) + && !options.isShutdownUnboundedProducersWithMaxWatermark()) { + return false; + } + } + } + // The PTransform's watermark was at positive infinity and all of its outputs are known to be + // done. It is done. + return true; + } + + /** + * Returns true if all steps are done. + */ + public boolean isDone() { + for (AppliedPTransform transform : stepNames.keySet()) { + if (!isDone(transform)) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java new file mode 100644 index 0000000..44d8bd9 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; +import org.apache.beam.sdk.util.BaseExecutionContext; +import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; + +/** + * Execution Context for the {@link InProcessPipelineRunner}. + * + * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created + * for each thread that requires it. + */ +class InProcessExecutionContext + extends BaseExecutionContext { + private final Clock clock; + private final Object key; + private final CopyOnAccessInMemoryStateInternals existingState; + private final TransformWatermarks watermarks; + + public InProcessExecutionContext(Clock clock, Object key, + CopyOnAccessInMemoryStateInternals existingState, TransformWatermarks watermarks) { + this.clock = clock; + this.key = key; + this.existingState = existingState; + this.watermarks = watermarks; + } + + @Override + protected InProcessStepContext createStepContext(String stepName, String transformName) { + return new InProcessStepContext(this, stepName, transformName); + } + + /** + * Step Context for the {@link InProcessPipelineRunner}. + */ + public class InProcessStepContext + extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext { + private CopyOnAccessInMemoryStateInternals stateInternals; + private InProcessTimerInternals timerInternals; + + public InProcessStepContext( + ExecutionContext executionContext, String stepName, String transformName) { + super(executionContext, stepName, transformName); + } + + @Override + public CopyOnAccessInMemoryStateInternals stateInternals() { + if (stateInternals == null) { + stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState); + } + return stateInternals; + } + + @Override + public InProcessTimerInternals timerInternals() { + if (timerInternals == null) { + timerInternals = + InProcessTimerInternals.create(clock, watermarks, TimerUpdate.builder(key)); + } + return timerInternals; + } + + /** + * Commits the state of this step, and returns the committed state. If the step has not + * accessed any state, return null. + */ + public CopyOnAccessInMemoryStateInternals commitState() { + if (stateInternals != null) { + return stateInternals.commit(); + } + return null; + } + + /** + * Gets the timer update of the {@link TimerInternals} of this {@link InProcessStepContext}, + * which is empty if the {@link TimerInternals} were never accessed. + */ + public TimerUpdate getTimerUpdate() { + if (timerInternals == null) { + return TimerUpdate.empty(); + } + return timerInternals.getTimerUpdate(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java new file mode 100644 index 0000000..d811e1b --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; + +import java.util.Collection; + +/** + * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both + * source and intermediate {@link PTransform PTransforms}. + */ +interface InProcessExecutor { + /** + * Starts this executor. The provided collection is the collection of root transforms to + * initially schedule. + * + * @param rootTransforms + */ + void start(Collection> rootTransforms); + + /** + * Blocks until the job being executed enters a terminal state. A job is completed after all + * root {@link AppliedPTransform AppliedPTransforms} have completed, and all + * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally. + * + * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the + * waiting thread and rethrows it + */ + void awaitCompletion() throws Throwable; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java new file mode 100644 index 0000000..512b3bd --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Hidden; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation.Required; +import org.apache.beam.sdk.transforms.PTransform; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Options that can be used to configure the {@link InProcessPipelineRunner}. + */ +public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions { + /** + * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService} + * to execute {@link PTransform PTransforms}. + * + *

Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that + * it cannot enter a state in which it will not schedule additional pending work unless currently + * scheduled work completes, as this may cause the {@link Pipeline} to cease processing. + * + *

Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of + * {@link Executors#newCachedThreadPool()}. + */ + @JsonIgnore + @Required + @Hidden + @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class) + ExecutorServiceFactory getExecutorServiceFactory(); + + void setExecutorServiceFactory(ExecutorServiceFactory executorService); + + /** + * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the + * system time when time values are required by the evaluator. + */ + @Default.InstanceFactory(NanosOffsetClock.Factory.class) + @JsonIgnore + @Required + @Hidden + @Description( + "The processing time source used by the pipeline. When the current time is " + + "needed by the evaluator, the result of clock#now() is used.") + Clock getClock(); + + void setClock(Clock clock); + + @Default.Boolean(false) + @Description( + "If the pipeline should shut down producers which have reached the maximum " + + "representable watermark. If this is set to true, a pipeline in which all PTransforms " + + "have reached the maximum watermark will be shut down, even if there are unbounded " + + "sources that could produce additional (late) data. By default, if the pipeline " + + "contains any unbounded PCollections, it will run until explicitly shut down.") + boolean isShutdownUnboundedProducersWithMaxWatermark(); + + void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown); + + @Default.Boolean(true) + @Description( + "If the pipeline should block awaiting completion of the pipeline. If set to true, " + + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, " + + "the Pipeline will execute asynchronously. If set to false, the completion of the " + + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().") + boolean isBlockOnRun(); + + void setBlockOnRun(boolean b); + + @Default.Boolean(true) + @Description( + "Controls whether the runner should ensure that all of the elements of every " + + "PCollection are not mutated. PTransforms are not permitted to mutate input elements " + + "at any point, or output elements after they are output.") + boolean isTestImmutability(); + + void setTestImmutability(boolean test); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java new file mode 100644 index 0000000..bb8c0de --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory; +import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.AvroIO; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.AggregatorPipelineExtractor; +import org.apache.beam.sdk.runners.AggregatorRetrievalException; +import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.util.MapAggregatorValues; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.Counter; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.joda.time.Instant; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import javax.annotation.Nullable; + +/** + * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded + * {@link PCollection PCollections}. + */ +@Experimental +public class InProcessPipelineRunner + extends PipelineRunner { + /** + * The default set of transform overrides to use in the {@link InProcessPipelineRunner}. + * + *

A transform override must have a single-argument constructor that takes an instance of the + * type of transform it is overriding. + */ + @SuppressWarnings("rawtypes") + private static Map, PTransformOverrideFactory> + defaultTransformOverrides = + ImmutableMap., PTransformOverrideFactory>builder() + .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory()) + .put(CreatePCollectionView.class, new InProcessViewOverrideFactory()) + .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory()) + .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory()) + .build(); + + /** + * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be + * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is + * a part of at a later point. This is an uncommitted bundle and can have elements added to it. + * + * @param the type of elements that can be added to this bundle + */ + public static interface UncommittedBundle { + /** + * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to. + */ + PCollection getPCollection(); + + /** + * Outputs an element to this bundle. + * + * @param element the element to add to this bundle + * @return this bundle + */ + UncommittedBundle add(WindowedValue element); + + /** + * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle} + * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method + * will throw an {@link IllegalStateException} if called after a call to commit. + * @param synchronizedProcessingTime the synchronized processing time at which this bundle was + * committed + */ + CommittedBundle commit(Instant synchronizedProcessingTime); + } + + /** + * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will + * eventually committed. Committed elements are executed by the {@link PTransform PTransforms} + * that consume the {@link PCollection} this bundle is + * a part of at a later point. + * @param the type of elements contained within this bundle + */ + public static interface CommittedBundle { + /** + * Returns the PCollection that the elements of this bundle belong to. + */ + PCollection getPCollection(); + + /** + * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the + * execution of this bundle. + */ + @Nullable + Object getKey(); + + /** + * Returns an {@link Iterable} containing all of the elements that have been added to this + * {@link CommittedBundle}. + */ + Iterable> getElements(); + + /** + * Returns the processing time output watermark at the time the producing {@link PTransform} + * committed this bundle. Downstream synchronized processing time watermarks cannot progress + * past this point before consuming this bundle. + * + *

This value is no greater than the earliest incomplete processing time or synchronized + * processing time {@link TimerData timer} at the time this bundle was committed, including any + * timers that fired to produce this bundle. + */ + Instant getSynchronizedProcessingOutputWatermark(); + + /** + * Return a new {@link CommittedBundle} that is like this one, except calls to + * {@link #getElements()} will return the provided elements. This bundle is unchanged. + * + *

+ * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing + * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from + * the current bundle. This is used to ensure a {@link PTransform} that could not complete + * processing on input elements properly holds the synchronized processing time to the + * appropriate value. + */ + CommittedBundle withElements(Iterable> elements); + } + + /** + * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to + * a storage mechanism that can be read from while constructing a {@link PCollectionView}. + * @param the type of elements the input {@link PCollection} contains. + * @param the type of the PCollectionView this writer writes to. + */ + public static interface PCollectionViewWriter { + void add(Iterable> values); + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + private final InProcessPipelineOptions options; + + public static InProcessPipelineRunner fromOptions(PipelineOptions options) { + return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class)); + } + + private InProcessPipelineRunner(InProcessPipelineOptions options) { + this.options = options; + } + + /** + * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; + } + + @Override + public OutputT apply( + PTransform transform, InputT input) { + PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass()); + if (overrideFactory != null) { + PTransform customTransform = overrideFactory.override(transform); + + return super.apply(customTransform, input); + } + // If there is no override, or we should not apply the override, apply the original transform + return super.apply(transform, input); + } + + @Override + public InProcessPipelineResult run(Pipeline pipeline) { + ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor(); + pipeline.traverseTopologically(consumerTrackingVisitor); + for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) { + unfinalized.finishSpecifying(); + } + @SuppressWarnings("rawtypes") + KeyedPValueTrackingVisitor keyedPValueVisitor = + KeyedPValueTrackingVisitor.create( + ImmutableSet.>of( + GroupByKey.class, InProcessGroupByKeyOnly.class)); + pipeline.traverseTopologically(keyedPValueVisitor); + + InProcessEvaluationContext context = + InProcessEvaluationContext.create( + getPipelineOptions(), + createBundleFactory(getPipelineOptions()), + consumerTrackingVisitor.getRootTransforms(), + consumerTrackingVisitor.getValueToConsumers(), + consumerTrackingVisitor.getStepNames(), + consumerTrackingVisitor.getViews()); + + // independent executor service for each run + ExecutorService executorService = + context.getPipelineOptions().getExecutorServiceFactory().create(); + InProcessExecutor executor = + ExecutorServiceParallelExecutor.create( + executorService, + consumerTrackingVisitor.getValueToConsumers(), + keyedPValueVisitor.getKeyedPValues(), + TransformEvaluatorRegistry.defaultRegistry(), + defaultModelEnforcements(options), + context); + executor.start(consumerTrackingVisitor.getRootTransforms()); + + Map, Collection>> aggregatorSteps = + new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); + InProcessPipelineResult result = + new InProcessPipelineResult(executor, context, aggregatorSteps); + if (options.isBlockOnRun()) { + try { + result.awaitCompletion(); + } catch (UserCodeException userException) { + throw new PipelineExecutionException(userException.getCause()); + } catch (Throwable t) { + Throwables.propagate(t); + } + } + return result; + } + + private Map, Collection> + defaultModelEnforcements(InProcessPipelineOptions options) { + ImmutableMap.Builder, Collection> + enforcements = ImmutableMap.builder(); + Collection parDoEnforcements = createParDoEnforcements(options); + enforcements.put(ParDo.Bound.class, parDoEnforcements); + enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); + return enforcements.build(); + } + + private Collection createParDoEnforcements( + InProcessPipelineOptions options) { + ImmutableList.Builder enforcements = ImmutableList.builder(); + if (options.isTestImmutability()) { + enforcements.add(ImmutabilityEnforcementFactory.create()); + } + return enforcements.build(); + } + + private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { + BundleFactory bundleFactory = InProcessBundleFactory.create(); + if (pipelineOptions.isTestImmutability()) { + bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory); + } + return bundleFactory; + } + + /** + * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}. + * + * Throws {@link UnsupportedOperationException} for all methods. + */ + public static class InProcessPipelineResult implements PipelineResult { + private final InProcessExecutor executor; + private final InProcessEvaluationContext evaluationContext; + private final Map, Collection>> aggregatorSteps; + private State state; + + private InProcessPipelineResult( + InProcessExecutor executor, + InProcessEvaluationContext evaluationContext, + Map, Collection>> aggregatorSteps) { + this.executor = executor; + this.evaluationContext = evaluationContext; + this.aggregatorSteps = aggregatorSteps; + // Only ever constructed after the executor has started. + this.state = State.RUNNING; + } + + @Override + public State getState() { + return state; + } + + @Override + public AggregatorValues getAggregatorValues(Aggregator aggregator) + throws AggregatorRetrievalException { + CounterSet counters = evaluationContext.getCounters(); + Collection> steps = aggregatorSteps.get(aggregator); + Map stepValues = new HashMap<>(); + for (AppliedPTransform transform : evaluationContext.getSteps()) { + if (steps.contains(transform.getTransform())) { + String stepName = + String.format( + "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName()); + Counter counter = (Counter) counters.getExistingCounter(stepName); + if (counter != null) { + stepValues.put(transform.getFullName(), counter.getAggregate()); + } + } + } + return new MapAggregatorValues<>(stepValues); + } + + /** + * Blocks until the {@link Pipeline} execution represented by this + * {@link InProcessPipelineResult} is complete, returning the terminal state. + * + *

If the pipeline terminates abnormally by throwing an exception, this will rethrow the + * exception. Future calls to {@link #getState()} will return + * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}. + * + *

NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded} + * {@link PCollection}, and the {@link PipelineRunner} was created with + * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false, + * this method will never return. + * + * See also {@link InProcessExecutor#awaitCompletion()}. + */ + public State awaitCompletion() throws Throwable { + if (!state.isTerminal()) { + try { + executor.awaitCompletion(); + state = State.DONE; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } catch (Throwable t) { + state = State.FAILED; + throw t; + } + } + return state; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java new file mode 100644 index 0000000..4a09de7 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the + * {@link InProcessPipelineRunner}. + */ +public class InProcessRegistrar { + private InProcessRegistrar() {} + /** + * Registers the {@link InProcessPipelineRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class InProcessRunner implements PipelineRunnerRegistrar { + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of(InProcessPipelineRunner.class); + } + } + + /** + * Registers the {@link InProcessPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class InProcessOptions implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of(InProcessPipelineOptions.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java new file mode 100644 index 0000000..f4980ef --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PCollectionViewWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.SettableFuture; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import javax.annotation.Nullable; + +/** + * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for + * constructing {@link SideInputReader SideInputReaders} which block until a side input is + * available and writing to a {@link PCollectionView}. + */ +class InProcessSideInputContainer { + private final InProcessEvaluationContext evaluationContext; + private final Collection> containedViews; + private final LoadingCache, + SettableFuture>>> viewByWindows; + + /** + * Create a new {@link InProcessSideInputContainer} with the provided views and the provided + * context. + */ + public static InProcessSideInputContainer create( + InProcessEvaluationContext context, Collection> containedViews) { + CacheLoader, SettableFuture>>> + loader = new CacheLoader, + SettableFuture>>>() { + @Override + public SettableFuture>> load( + PCollectionViewWindow view) { + return SettableFuture.create(); + } + }; + LoadingCache, SettableFuture>>> + viewByWindows = CacheBuilder.newBuilder().build(loader); + return new InProcessSideInputContainer(context, containedViews, viewByWindows); + } + + private InProcessSideInputContainer(InProcessEvaluationContext context, + Collection> containedViews, + LoadingCache, SettableFuture>>> + viewByWindows) { + this.evaluationContext = context; + this.containedViews = ImmutableSet.copyOf(containedViews); + this.viewByWindows = viewByWindows; + } + + /** + * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the + * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without + * casting, but will change as this {@link InProcessSideInputContainer} is modified. + */ + public ReadyCheckingSideInputReader createReaderForViews( + Collection> newContainedViews) { + if (!containedViews.containsAll(newContainedViews)) { + Set> currentlyContained = ImmutableSet.copyOf(containedViews); + Set> newRequested = ImmutableSet.copyOf(newContainedViews); + throw new IllegalArgumentException("Can't create a SideInputReader with unknown views " + + Sets.difference(newRequested, currentlyContained)); + } + return new SideInputContainerSideInputReader(newContainedViews); + } + + /** + * Write the provided values to the provided view. + * + *

The windowed values are first exploded, then for each window the pane is determined. For + * each window, if the pane is later than the current pane stored within this container, write + * all of the values to the container as the new values of the {@link PCollectionView}. + * + *

The provided iterable is expected to contain only a single window and pane. + */ + public void write(PCollectionView view, Iterable> values) { + Map>> valuesPerWindow = + indexValuesByWindow(values); + for (Map.Entry>> windowValues : + valuesPerWindow.entrySet()) { + updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue()); + } + } + + /** + * Index the provided values by all {@link BoundedWindow windows} in which they appear. + */ + private Map>> indexValuesByWindow( + Iterable> values) { + Map>> valuesPerWindow = new HashMap<>(); + for (WindowedValue value : values) { + for (BoundedWindow window : value.getWindows()) { + Collection> windowValues = valuesPerWindow.get(window); + if (windowValues == null) { + windowValues = new ArrayList<>(); + valuesPerWindow.put(window, windowValues); + } + windowValues.add(value); + } + } + return valuesPerWindow; + } + + /** + * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the + * specified values, if the values are part of a later pane than currently exist within the + * {@link PCollectionViewWindow}. + */ + private void updatePCollectionViewWindowValues( + PCollectionView view, BoundedWindow window, Collection> windowValues) { + PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); + SettableFuture>> future = null; + try { + future = viewByWindows.get(windowedView); + if (future.isDone()) { + Iterator> existingValues = future.get().iterator(); + PaneInfo newPane = windowValues.iterator().next().getPane(); + // The current value may have no elements, if no elements were produced for the window, + // but we are recieving late data. + if (!existingValues.hasNext() + || newPane.getIndex() > existingValues.next().getPane().getIndex()) { + viewByWindows.invalidate(windowedView); + viewByWindows.get(windowedView).set(windowValues); + } + } else { + future.set(windowValues); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (future != null && !future.isDone()) { + future.set(Collections.>emptyList()); + } + } catch (ExecutionException e) { + Throwables.propagate(e.getCause()); + } + } + + private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader { + private final Collection> readerViews; + + private SideInputContainerSideInputReader(Collection> readerViews) { + this.readerViews = ImmutableSet.copyOf(readerViews); + } + + @Override + public boolean isReady(final PCollectionView view, final BoundedWindow window) { + checkArgument( + readerViews.contains(view), + "Tried to check if view %s was ready in a SideInputReader that does not contain it. " + + "Contained views; %s", + view, + readerViews); + return getViewFuture(view, window).isDone(); + } + + @Override + @Nullable + public T get(final PCollectionView view, final BoundedWindow window) { + checkArgument( + readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view); + try { + final Future>> future = getViewFuture(view, window); + // Safe covariant cast + @SuppressWarnings("unchecked") + Iterable> values = (Iterable>) future.get(); + return view.fromIterableInternal(values); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + /** + * Gets the future containing the contents of the provided {@link PCollectionView} in the + * provided {@link BoundedWindow}, setting up a callback to populate the future with empty + * contents if necessary. + */ + private Future>> getViewFuture( + final PCollectionView view, final BoundedWindow window) { + PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); + final SettableFuture>> future = + viewByWindows.getUnchecked(windowedView); + + WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); + evaluationContext.scheduleAfterOutputWouldBeProduced( + view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future)); + return future; + } + + @Override + public boolean contains(PCollectionView view) { + return readerViews.contains(view); + } + + @Override + public boolean isEmpty() { + return readerViews.isEmpty(); + } + } + + private static class WriteEmptyViewContents implements Runnable { + private final PCollectionView view; + private final BoundedWindow window; + private final SettableFuture>> future; + + private WriteEmptyViewContents(PCollectionView view, BoundedWindow window, + SettableFuture>> future) { + this.future = future; + this.view = view; + this.window = window; + } + + @Override + public void run() { + // The requested window has closed without producing elements, so reflect that in + // the PCollectionView. If set has already been called, will do nothing. + future.set(Collections.>emptyList()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("view", view) + .add("window", window) + .toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java new file mode 100644 index 0000000..cd54f59 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks; +import org.apache.beam.sdk.util.TimerInternals; + +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +/** + * An implementation of {@link TimerInternals} where all relevant data exists in memory. + */ +public class InProcessTimerInternals implements TimerInternals { + private final Clock processingTimeClock; + private final TransformWatermarks watermarks; + private final TimerUpdateBuilder timerUpdateBuilder; + + public static InProcessTimerInternals create( + Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) { + return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder); + } + + private InProcessTimerInternals( + Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) { + this.processingTimeClock = clock; + this.watermarks = watermarks; + this.timerUpdateBuilder = timerUpdateBuilder; + } + + @Override + public void setTimer(TimerData timerKey) { + timerUpdateBuilder.setTimer(timerKey); + } + + @Override + public void deleteTimer(TimerData timerKey) { + timerUpdateBuilder.deletedTimer(timerKey); + } + + public TimerUpdate getTimerUpdate() { + return timerUpdateBuilder.build(); + } + + @Override + public Instant currentProcessingTime() { + return processingTimeClock.now(); + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return watermarks.getSynchronizedProcessingInputTime(); + } + + @Override + public Instant currentInputWatermarkTime() { + return watermarks.getInputWatermark(); + } + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return watermarks.getOutputWatermark(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java new file mode 100644 index 0000000..a132c33 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; + +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +/** + * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}. + */ +public interface InProcessTransformResult { + /** + * Returns the {@link AppliedPTransform} that produced this result. + */ + AppliedPTransform getTransform(); + + /** + * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These + * will be committed by the evaluation context as part of completing this result. + */ + Iterable> getOutputBundles(); + + /** + * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did + * not use a {@link CounterSet}. + */ + @Nullable CounterSet getCounters(); + + /** + * Returns the Watermark Hold for the transform at the time this result was produced. + * + * If the transform does not set any watermark hold, returns + * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + */ + Instant getWatermarkHold(); + + /** + * Returns the State used by the transform. + * + * If this evaluation did not access state, this may return null. + */ + @Nullable + CopyOnAccessInMemoryStateInternals getState(); + + /** + * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the + * evaluation was triggered due to the delivery of one or more timers, those timers must be added + * to the builder before it is complete. + * + *

If this evaluation did not add or remove any timers, returns an empty TimerUpdate. + */ + TimerUpdate getTimerUpdate(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java new file mode 100644 index 0000000..b7c755e --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PValue; + +import java.util.HashSet; +import java.util.Set; + +/** + * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it + * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that + * produces keyed outputs is assumed to colocate output elements that share a key. + * + *

All {@link GroupByKey} transforms, or their runner-specific implementation primitive, produce + * keyed output. + */ +// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms +// unkeyed +class KeyedPValueTrackingVisitor implements PipelineVisitor { + @SuppressWarnings("rawtypes") + private final Set> producesKeyedOutputs; + private final Set keyedValues; + private boolean finalized; + + public static KeyedPValueTrackingVisitor create( + @SuppressWarnings("rawtypes") Set> producesKeyedOutputs) { + return new KeyedPValueTrackingVisitor(producesKeyedOutputs); + } + + private KeyedPValueTrackingVisitor( + @SuppressWarnings("rawtypes") Set> producesKeyedOutputs) { + this.producesKeyedOutputs = producesKeyedOutputs; + this.keyedValues = new HashSet<>(); + } + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + checkState( + !finalized, + "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)", + KeyedPValueTrackingVisitor.class.getSimpleName(), + node); + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + checkState( + !finalized, + "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)", + KeyedPValueTrackingVisitor.class.getSimpleName(), + node); + if (node.isRootNode()) { + finalized = true; + } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) { + keyedValues.addAll(node.getExpandedOutputs()); + } + } + + @Override + public void visitTransform(TransformTreeNode node) {} + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + if (producesKeyedOutputs.contains(producer.getTransform().getClass())) { + keyedValues.addAll(value.expand()); + } + } + + public Set getKeyedPValues() { + checkState( + finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed"); + return keyedValues; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java new file mode 100644 index 0000000..cc9b6da --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +/** + * Enforcement tools that verify that executing code conforms to the model. + * + *

ModelEnforcement is performed on a per-element and per-bundle basis. The + * {@link ModelEnforcement} is provided with the input bundle as part of + * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element + * before and after that element is provided to an underlying {@link TransformEvaluator}, and the + * output {@link InProcessTransformResult} and committed output bundles after the + * {@link TransformEvaluator} has completed. + * + *

Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder} + * of the input {@link PCollection} on construction, and then enforce per-element behavior + * (such as the immutability of input elements). When the element is output or the bundle is + * completed, the required conditions can be enforced across all elements. + */ +public interface ModelEnforcement { + /** + * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the + * provided {@link WindowedValue}. + */ + void beforeElement(WindowedValue element); + + /** + * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the + * provided {@link WindowedValue}. + */ + void afterElement(WindowedValue element); + + /** + * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been + * called, producing the provided {@link InProcessTransformResult} and + * {@link CommittedBundle output bundles}. + */ + void afterFinish( + CommittedBundle input, + InProcessTransformResult result, + Iterable> outputs); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java new file mode 100644 index 0000000..6162ba0 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.transforms.AppliedPTransform; + +/** + * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input + * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the + * {@link TransformEvaluator} is created. + */ +public interface ModelEnforcementFactory { + ModelEnforcement forBundle(CommittedBundle input, AppliedPTransform consumer); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java new file mode 100644 index 0000000..ffdee9d --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.PipelineOptions; + +import org.joda.time.Instant; + +import java.util.concurrent.TimeUnit; + +/** + * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time. + */ +public class NanosOffsetClock implements Clock { + private final long baseMillis; + private final long nanosAtBaseMillis; + + public static NanosOffsetClock create() { + return new NanosOffsetClock(); + } + + private NanosOffsetClock() { + baseMillis = System.currentTimeMillis(); + nanosAtBaseMillis = System.nanoTime(); + } + + @Override + public Instant now() { + return new Instant( + baseMillis + (TimeUnit.MILLISECONDS.convert( + System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS))); + } + + /** + * Creates instances of {@link NanosOffsetClock}. + */ + public static class Factory implements DefaultValueFactory { + @Override + public Clock create(PipelineOptions options) { + return new NanosOffsetClock(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java new file mode 100644 index 0000000..81e4863 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +interface PTransformOverrideFactory { + /** + * Create a {@link PTransform} override for the provided {@link PTransform} if applicable. + * Otherwise, return the input {@link PTransform}. + * + *

The returned PTransform must be semantically equivalent to the input {@link PTransform}. + */ + PTransform override( + PTransform transform); +}