beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [30/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:47:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
deleted file mode 100644
index 43cd9eb..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutionContext.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
-import com.google.cloud.dataflow.sdk.util.BaseExecutionContext;
-import com.google.cloud.dataflow.sdk.util.ExecutionContext;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
-import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-/**
- * Execution Context for the {@link InProcessPipelineRunner}.
- *
- * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created
- * for each thread that requires it.
- */
-class InProcessExecutionContext
-    extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
-  private final Clock clock;
-  private final Object key;
-  private final CopyOnAccessInMemoryStateInternals<Object> existingState;
-  private final TransformWatermarks watermarks;
-
-  public InProcessExecutionContext(Clock clock, Object key,
-      CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
-    this.clock = clock;
-    this.key = key;
-    this.existingState = existingState;
-    this.watermarks = watermarks;
-  }
-
-  @Override
-  protected InProcessStepContext createStepContext(
-      String stepName, String transformName, StateSampler stateSampler) {
-    return new InProcessStepContext(this, stepName, transformName);
-  }
-
-  /**
-   * Step Context for the {@link InProcessPipelineRunner}.
-   */
-  public class InProcessStepContext
-      extends com.google.cloud.dataflow.sdk.util.BaseExecutionContext.StepContext {
-    private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
-    private InProcessTimerInternals timerInternals;
-
-    public InProcessStepContext(
-        ExecutionContext executionContext, String stepName, String transformName) {
-      super(executionContext, stepName, transformName);
-    }
-
-    @Override
-    public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
-      if (stateInternals == null) {
-        stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
-      }
-      return stateInternals;
-    }
-
-    @Override
-    public InProcessTimerInternals timerInternals() {
-      if (timerInternals == null) {
-        timerInternals =
-            InProcessTimerInternals.create(clock, watermarks, TimerUpdate.builder(key));
-      }
-      return timerInternals;
-    }
-
-    /**
-     * Commits the state of this step, and returns the committed state. If the step has not
-     * accessed any state, return null.
-     */
-    public CopyOnAccessInMemoryStateInternals<?> commitState() {
-      if (stateInternals != null) {
-        return stateInternals.commit();
-      }
-      return null;
-    }
-
-    /**
-     * Gets the timer update of the {@link TimerInternals} of this {@link InProcessStepContext},
-     * which is empty if the {@link TimerInternals} were never accessed.
-     */
-    public TimerUpdate getTimerUpdate() {
-      if (timerInternals == null) {
-        return TimerUpdate.empty();
-      }
-      return timerInternals.getTimerUpdate();
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
deleted file mode 100644
index 7b60bca..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import java.util.Collection;
-
-/**
- * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
- * source and intermediate {@link PTransform PTransforms}.
- */
-interface InProcessExecutor {
-  /**
-   * Starts this executor. The provided collection is the collection of root transforms to
-   * initially schedule.
-   *
-   * @param rootTransforms
-   */
-  void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
-
-  /**
-   * Blocks until the job being executed enters a terminal state. A job is completed after all
-   * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
-   * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
-   *
-   * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
-   *                   waiting thread and rethrows it
-   */
-  void awaitCompletion() throws Throwable;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
deleted file mode 100644
index 5ee0e88..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.Hidden;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Validation.Required;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Options that can be used to configure the {@link InProcessPipelineRunner}.
- */
-public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
-  /**
-   * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
-   * to execute {@link PTransform PTransforms}.
-   *
-   * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
-   * it cannot enter a state in which it will not schedule additional pending work unless currently
-   * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
-   *
-   * <p>Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of
-   * {@link Executors#newCachedThreadPool()}.
-   */
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class)
-  ExecutorServiceFactory getExecutorServiceFactory();
-
-  void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
-  /**
-   * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
-   * system time when time values are required by the evaluator.
-   */
-  @Default.InstanceFactory(NanosOffsetClock.Factory.class)
-  @JsonIgnore
-  @Required
-  @Hidden
-  @Description(
-      "The processing time source used by the pipeline. When the current time is "
-          + "needed by the evaluator, the result of clock#now() is used.")
-  Clock getClock();
-
-  void setClock(Clock clock);
-
-  @Default.Boolean(false)
-  @Description(
-      "If the pipeline should shut down producers which have reached the maximum "
-          + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
-          + "have reached the maximum watermark will be shut down, even if there are unbounded "
-          + "sources that could produce additional (late) data. By default, if the pipeline "
-          + "contains any unbounded PCollections, it will run until explicitly shut down.")
-  boolean isShutdownUnboundedProducersWithMaxWatermark();
-
-  void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
-
-  @Default.Boolean(true)
-  @Description(
-      "If the pipeline should block awaiting completion of the pipeline. If set to true, "
-          + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
-          + "the Pipeline will execute asynchronously. If set to false, the completion of the "
-          + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().")
-  boolean isBlockOnRun();
-
-  void setBlockOnRun(boolean b);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
deleted file mode 100644
index a1c8756..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.AggregatorPipelineExtractor;
-import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
-import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
-import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
-import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.common.Counter;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.cloud.dataflow.sdk.values.POutput;
-import com.google.cloud.dataflow.sdk.values.PValue;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-import javax.annotation.Nullable;
-
-/**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
- */
-@Experimental
-public class InProcessPipelineRunner
-    extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> {
-  /**
-   * The default set of transform overrides to use in the {@link InProcessPipelineRunner}.
-   *
-   * <p>A transform override must have a single-argument constructor that takes an instance of the
-   * type of transform it is overriding.
-   */
-  @SuppressWarnings("rawtypes")
-  private static Map<Class<? extends PTransform>, Class<? extends PTransform>>
-      defaultTransformOverrides =
-          ImmutableMap.<Class<? extends PTransform>, Class<? extends PTransform>>builder()
-              .put(Create.Values.class, InProcessCreate.class)
-              .put(GroupByKey.class, InProcessGroupByKey.class)
-              .put(
-                  CreatePCollectionView.class,
-                  ViewEvaluatorFactory.InProcessCreatePCollectionView.class)
-              .build();
-
-  /**
-   * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
-   * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
-   * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
-   *
-   * @param <T> the type of elements that can be added to this bundle
-   */
-  public static interface UncommittedBundle<T> {
-    /**
-     * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
-     */
-    PCollection<T> getPCollection();
-
-    /**
-     * Outputs an element to this bundle.
-     *
-     * @param element the element to add to this bundle
-     * @return this bundle
-     */
-    UncommittedBundle<T> add(WindowedValue<T> element);
-
-    /**
-     * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
-     * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
-     * will throw an {@link IllegalStateException} if called after a call to commit.
-     * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
-     *                                   committed
-     */
-    CommittedBundle<T> commit(Instant synchronizedProcessingTime);
-  }
-
-  /**
-   * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
-   * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
-   * that consume the {@link PCollection} this bundle is
-   * a part of at a later point.
-   * @param <T> the type of elements contained within this bundle
-   */
-  public static interface CommittedBundle<T> {
-    /**
-     * Returns the PCollection that the elements of this bundle belong to.
-     */
-    PCollection<T> getPCollection();
-
-    /**
-     * Returns whether this bundle is keyed. A bundle that is part of a {@link PCollection} that
-     * occurs after a {@link GroupByKey} is keyed by the result of the last {@link GroupByKey}.
-     */
-    boolean isKeyed();
-
-    /**
-     * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the
-     * execution of this bundle.
-     */
-    @Nullable
-    Object getKey();
-
-    /**
-     * Returns an {@link Iterable} containing all of the elements that have been added to this
-     * {@link CommittedBundle}.
-     */
-    Iterable<WindowedValue<T>> getElements();
-
-    /**
-     * Returns the processing time output watermark at the time the producing {@link PTransform}
-     * committed this bundle. Downstream synchronized processing time watermarks cannot progress
-     * past this point before consuming this bundle.
-     *
-     * <p>This value is no greater than the earliest incomplete processing time or synchronized
-     * processing time {@link TimerData timer} at the time this bundle was committed, including any
-     * timers that fired to produce this bundle.
-     */
-    Instant getSynchronizedProcessingOutputWatermark();
-  }
-
-  /**
-   * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
-   * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
-   * @param <ElemT> the type of elements the input {@link PCollection} contains.
-   * @param <ViewT> the type of the PCollectionView this writer writes to.
-   */
-  public static interface PCollectionViewWriter<ElemT, ViewT> {
-    void add(Iterable<WindowedValue<ElemT>> values);
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////////////////
-  private final InProcessPipelineOptions options;
-
-  public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
-    return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
-  }
-
-  private InProcessPipelineRunner(InProcessPipelineOptions options) {
-    this.options = options;
-  }
-
-  /**
-   * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
-   */
-  public InProcessPipelineOptions getPipelineOptions() {
-    return options;
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    Class<?> overrideClass = defaultTransformOverrides.get(transform.getClass());
-    if (overrideClass != null) {
-      // It is the responsibility of whoever constructs overrides to ensure this is type safe.
-      @SuppressWarnings("unchecked")
-      Class<PTransform<InputT, OutputT>> transformClass =
-          (Class<PTransform<InputT, OutputT>>) transform.getClass();
-
-      @SuppressWarnings("unchecked")
-      Class<PTransform<InputT, OutputT>> customTransformClass =
-          (Class<PTransform<InputT, OutputT>>) overrideClass;
-
-      PTransform<InputT, OutputT> customTransform =
-          InstanceBuilder.ofType(customTransformClass)
-          .withArg(transformClass, transform)
-          .build();
-
-      // This overrides the contents of the apply method without changing the TransformTreeNode that
-      // is generated by the PCollection application.
-      return super.apply(customTransform, input);
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-  @Override
-  public InProcessPipelineResult run(Pipeline pipeline) {
-    ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
-    pipeline.traverseTopologically(consumerTrackingVisitor);
-    for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
-      unfinalized.finishSpecifying();
-    }
-    @SuppressWarnings("rawtypes")
-    KeyedPValueTrackingVisitor keyedPValueVisitor =
-        KeyedPValueTrackingVisitor.create(
-            ImmutableSet.<Class<? extends PTransform>>of(
-                GroupByKey.class, InProcessGroupByKeyOnly.class));
-    pipeline.traverseTopologically(keyedPValueVisitor);
-
-    InProcessEvaluationContext context =
-        InProcessEvaluationContext.create(
-            getPipelineOptions(),
-            consumerTrackingVisitor.getRootTransforms(),
-            consumerTrackingVisitor.getValueToConsumers(),
-            consumerTrackingVisitor.getStepNames(),
-            consumerTrackingVisitor.getViews());
-
-    // independent executor service for each run
-    ExecutorService executorService =
-        context.getPipelineOptions().getExecutorServiceFactory().create();
-    InProcessExecutor executor =
-        ExecutorServiceParallelExecutor.create(
-            executorService,
-            consumerTrackingVisitor.getValueToConsumers(),
-            keyedPValueVisitor.getKeyedPValues(),
-            TransformEvaluatorRegistry.defaultRegistry(),
-            context);
-    executor.start(consumerTrackingVisitor.getRootTransforms());
-
-    Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
-        new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
-    InProcessPipelineResult result =
-        new InProcessPipelineResult(executor, context, aggregatorSteps);
-    if (options.isBlockOnRun()) {
-      try {
-        result.awaitCompletion();
-      } catch (UserCodeException userException) {
-        throw new PipelineExecutionException(userException.getCause());
-      } catch (Throwable t) {
-        Throwables.propagate(t);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
-   *
-   * Throws {@link UnsupportedOperationException} for all methods.
-   */
-  public static class InProcessPipelineResult implements PipelineResult {
-    private final InProcessExecutor executor;
-    private final InProcessEvaluationContext evaluationContext;
-    private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
-    private State state;
-
-    private InProcessPipelineResult(
-        InProcessExecutor executor,
-        InProcessEvaluationContext evaluationContext,
-        Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
-      this.executor = executor;
-      this.evaluationContext = evaluationContext;
-      this.aggregatorSteps = aggregatorSteps;
-      // Only ever constructed after the executor has started.
-      this.state = State.RUNNING;
-    }
-
-    @Override
-    public State getState() {
-      return state;
-    }
-
-    @Override
-    public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
-        throws AggregatorRetrievalException {
-      CounterSet counters = evaluationContext.getCounters();
-      Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
-      Map<String, T> stepValues = new HashMap<>();
-      for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
-        if (steps.contains(transform.getTransform())) {
-          String stepName =
-              String.format(
-                  "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
-          Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
-          if (counter != null) {
-            stepValues.put(transform.getFullName(), counter.getAggregate());
-          }
-        }
-      }
-      return new MapAggregatorValues<>(stepValues);
-    }
-
-    /**
-     * Blocks until the {@link Pipeline} execution represented by this
-     * {@link InProcessPipelineResult} is complete, returning the terminal state.
-     *
-     * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
-     * exception. Future calls to {@link #getState()} will return
-     * {@link com.google.cloud.dataflow.sdk.PipelineResult.State#FAILED}.
-     *
-     * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
-     * {@link PCollection}, and the {@link PipelineRunner} was created with
-     * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
-     * this method will never return.
-     *
-     * See also {@link InProcessExecutor#awaitCompletion()}.
-     */
-    public State awaitCompletion() throws Throwable {
-      if (!state.isTerminal()) {
-        try {
-          executor.awaitCompletion();
-          state = State.DONE;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw e;
-        } catch (Throwable t) {
-          state = State.FAILED;
-          throw t;
-        }
-      }
-      return state;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
deleted file mode 100644
index 37c9fcf..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
-import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
-import com.google.cloud.dataflow.sdk.util.SideInputReader;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Throwables;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import javax.annotation.Nullable;
-
-/**
- * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
- * constructing {@link SideInputReader SideInputReaders} which block until a side input is
- * available and writing to a {@link PCollectionView}.
- */
-class InProcessSideInputContainer {
-  private final InProcessEvaluationContext evaluationContext;
-  private final Collection<PCollectionView<?>> containedViews;
-  private final LoadingCache<PCollectionViewWindow<?>,
-      SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
-
-  /**
-   * Create a new {@link InProcessSideInputContainer} with the provided views and the provided
-   * context.
-   */
-  public static InProcessSideInputContainer create(
-      InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
-    CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
-        loader = new CacheLoader<PCollectionViewWindow<?>,
-            SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
-          @Override
-          public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
-              PCollectionViewWindow<?> view) {
-            return SettableFuture.create();
-          }
-        };
-    LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
-        viewByWindows = CacheBuilder.newBuilder().build(loader);
-    return new InProcessSideInputContainer(context, containedViews, viewByWindows);
-  }
-
-  private InProcessSideInputContainer(InProcessEvaluationContext context,
-      Collection<PCollectionView<?>> containedViews,
-      LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
-      viewByWindows) {
-    this.evaluationContext = context;
-    this.containedViews = ImmutableSet.copyOf(containedViews);
-    this.viewByWindows = viewByWindows;
-  }
-
-  /**
-   * Return a view of this {@link InProcessSideInputContainer} that contains only the views in
-   * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
-   * casting, but will change as this {@link InProcessSideInputContainer} is modified.
-   */
-  public SideInputReader createReaderForViews(Collection<PCollectionView<?>> newContainedViews) {
-    if (!containedViews.containsAll(newContainedViews)) {
-      Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
-      Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
-      throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
-          + Sets.difference(newRequested, currentlyContained));
-    }
-    return new SideInputContainerSideInputReader(newContainedViews);
-  }
-
-  /**
-   * Write the provided values to the provided view.
-   *
-   * <p>The windowed values are first exploded, then for each window the pane is determined. For
-   * each window, if the pane is later than the current pane stored within this container, write
-   * all of the values to the container as the new values of the {@link PCollectionView}.
-   *
-   * <p>The provided iterable is expected to contain only a single window and pane.
-   */
-  public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
-    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
-        indexValuesByWindow(values);
-    for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
-        valuesPerWindow.entrySet()) {
-      updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
-    }
-  }
-
-  /**
-   * Index the provided values by all {@link BoundedWindow windows} in which they appear.
-   */
-  private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
-      Iterable<? extends WindowedValue<?>> values) {
-    Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
-    for (WindowedValue<?> value : values) {
-      for (BoundedWindow window : value.getWindows()) {
-        Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
-        if (windowValues == null) {
-          windowValues = new ArrayList<>();
-          valuesPerWindow.put(window, windowValues);
-        }
-        windowValues.add(value);
-      }
-    }
-    return valuesPerWindow;
-  }
-
-  /**
-   * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
-   * specified values, if the values are part of a later pane than currently exist within the
-   * {@link PCollectionViewWindow}.
-   */
-  private void updatePCollectionViewWindowValues(
-      PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
-    PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
-    SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
-    try {
-      future = viewByWindows.get(windowedView);
-      if (future.isDone()) {
-        Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
-        PaneInfo newPane = windowValues.iterator().next().getPane();
-        // The current value may have no elements, if no elements were produced for the window,
-        // but we are recieving late data.
-        if (!existingValues.hasNext()
-            || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
-          viewByWindows.invalidate(windowedView);
-          viewByWindows.get(windowedView).set(windowValues);
-        }
-      } else {
-        future.set(windowValues);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      if (future != null && !future.isDone()) {
-        future.set(Collections.<WindowedValue<?>>emptyList());
-      }
-    } catch (ExecutionException e) {
-      Throwables.propagate(e.getCause());
-    }
-  }
-
-  private final class SideInputContainerSideInputReader implements SideInputReader {
-    private final Collection<PCollectionView<?>> readerViews;
-
-    private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
-      this.readerViews = ImmutableSet.copyOf(readerViews);
-    }
-
-    @Override
-    @Nullable
-    public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
-      checkArgument(
-          readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
-      PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
-      try {
-        final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
-            viewByWindows.get(windowedView);
-
-        WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
-        evaluationContext.scheduleAfterOutputWouldBeProduced(
-            view, window, windowingStrategy, new Runnable() {
-              @Override
-              public void run() {
-                // The requested window has closed without producing elements, so reflect that in
-                // the PCollectionView. If set has already been called, will do nothing.
-                future.set(Collections.<WindowedValue<?>>emptyList());
-          }
-
-          @Override
-          public String toString() {
-            return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback")
-                .add("view", view)
-                .add("window", window)
-                .toString();
-          }
-        });
-        // Safe covariant cast
-        @SuppressWarnings("unchecked")
-        Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
-        return view.fromIterableInternal(values);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public <T> boolean contains(PCollectionView<T> view) {
-      return readerViews.contains(view);
-    }
-
-    @Override
-    public boolean isEmpty() {
-      return readerViews.isEmpty();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
deleted file mode 100644
index 06ba7b8..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTimerInternals.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * An implementation of {@link TimerInternals} where all relevant data exists in memory.
- */
-public class InProcessTimerInternals implements TimerInternals {
-  private final Clock processingTimeClock;
-  private final TransformWatermarks watermarks;
-  private final TimerUpdateBuilder timerUpdateBuilder;
-
-  public static InProcessTimerInternals create(
-      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
-    return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder);
-  }
-
-  private InProcessTimerInternals(
-      Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
-    this.processingTimeClock = clock;
-    this.watermarks = watermarks;
-    this.timerUpdateBuilder = timerUpdateBuilder;
-  }
-
-  @Override
-  public void setTimer(TimerData timerKey) {
-    timerUpdateBuilder.setTimer(timerKey);
-  }
-
-  @Override
-  public void deleteTimer(TimerData timerKey) {
-    timerUpdateBuilder.deletedTimer(timerKey);
-  }
-
-  public TimerUpdate getTimerUpdate() {
-    return timerUpdateBuilder.build();
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTimeClock.now();
-  }
-
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return watermarks.getSynchronizedProcessingInputTime();
-  }
-
-  @Override
-  @Nullable
-  public Instant currentInputWatermarkTime() {
-    return watermarks.getInputWatermark();
-  }
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    return watermarks.getOutputWatermark();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTransformResult.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTransformResult.java
deleted file mode 100644
index 3f9e94a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessTransformResult.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
- */
-public interface InProcessTransformResult {
-  /**
-   * Returns the {@link AppliedPTransform} that produced this result.
-   */
-  AppliedPTransform<?, ?, ?> getTransform();
-
-  /**
-   * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
-   * will be committed by the evaluation context as part of completing this result.
-   */
-  Iterable<? extends UncommittedBundle<?>> getOutputBundles();
-
-  /**
-   * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
-   * not use a {@link CounterSet}.
-   */
-  @Nullable CounterSet getCounters();
-
-  /**
-   * Returns the Watermark Hold for the transform at the time this result was produced.
-   *
-   * If the transform does not set any watermark hold, returns
-   * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
-   */
-  Instant getWatermarkHold();
-
-  /**
-   * Returns the State used by the transform.
-   *
-   * If this evaluation did not access state, this may return null.
-   */
-  CopyOnAccessInMemoryStateInternals<?> getState();
-
-  /**
-   * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
-   * evaluation was triggered due to the delivery of one or more timers, those timers must be added
-   * to the builder before it is complete.
-   *
-   * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
-   */
-  TimerUpdate getTimerUpdate();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java
deleted file mode 100644
index 23a8c0f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/KeyedPValueTrackingVisitor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
-import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.PValue;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
- * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that
- * produces keyed outputs is assumed to colocate output elements that share a key.
- *
- * <p>All {@link GroupByKey} transforms, or their runner-specific implementation primitive, produce
- * keyed output.
- */
-// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
-// unkeyed
-class KeyedPValueTrackingVisitor implements PipelineVisitor {
-  @SuppressWarnings("rawtypes")
-  private final Set<Class<? extends PTransform>> producesKeyedOutputs;
-  private final Set<PValue> keyedValues;
-  private boolean finalized;
-
-  public static KeyedPValueTrackingVisitor create(
-      @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) {
-    return new KeyedPValueTrackingVisitor(producesKeyedOutputs);
-  }
-
-  private KeyedPValueTrackingVisitor(
-      @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) {
-    this.producesKeyedOutputs = producesKeyedOutputs;
-    this.keyedValues = new HashSet<>();
-  }
-
-  @Override
-  public void enterCompositeTransform(TransformTreeNode node) {
-    checkState(
-        !finalized,
-        "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
-        KeyedPValueTrackingVisitor.class.getSimpleName(),
-        node);
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
-    checkState(
-        !finalized,
-        "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
-        KeyedPValueTrackingVisitor.class.getSimpleName(),
-        node);
-    if (node.isRootNode()) {
-      finalized = true;
-    } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
-      keyedValues.addAll(node.getExpandedOutputs());
-    }
-  }
-
-  @Override
-  public void visitTransform(TransformTreeNode node) {}
-
-  @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
-    if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
-      keyedValues.addAll(value.expand());
-    }
-  }
-
-  public Set<PValue> getKeyedPValues() {
-    checkState(
-        finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed");
-    return keyedValues;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/NanosOffsetClock.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/NanosOffsetClock.java
deleted file mode 100644
index 958e26d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/NanosOffsetClock.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time.
- */
-public class NanosOffsetClock implements Clock {
-  private final long baseMillis;
-  private final long nanosAtBaseMillis;
-
-  public static NanosOffsetClock create() {
-    return new NanosOffsetClock();
-  }
-
-  private NanosOffsetClock() {
-    baseMillis = System.currentTimeMillis();
-    nanosAtBaseMillis = System.nanoTime();
-  }
-
-  @Override
-  public Instant now() {
-    return new Instant(
-        baseMillis + (TimeUnit.MILLISECONDS.convert(
-            System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
-  }
-
-  /**
-   * Creates instances of {@link NanosOffsetClock}.
-   */
-  public static class Factory implements DefaultValueFactory<Clock> {
-    @Override
-    public Clock create(PipelineOptions options) {
-      return new NanosOffsetClock();
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
deleted file mode 100644
index 2a21e8c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
-  private final DoFnRunner<T, ?> fnRunner;
-  private final AppliedPTransform<PCollection<T>, ?, ?> transform;
-  private final CounterSet counters;
-  private final Collection<UncommittedBundle<?>> outputBundles;
-  private final InProcessStepContext stepContext;
-
-  public ParDoInProcessEvaluator(
-      DoFnRunner<T, ?> fnRunner,
-      AppliedPTransform<PCollection<T>, ?, ?> transform,
-      CounterSet counters,
-      Collection<UncommittedBundle<?>> outputBundles,
-      InProcessStepContext stepContext) {
-    this.fnRunner = fnRunner;
-    this.transform = transform;
-    this.counters = counters;
-    this.outputBundles = outputBundles;
-    this.stepContext = stepContext;
-  }
-
-  @Override
-  public void processElement(WindowedValue<T> element) {
-    fnRunner.processElement(element);
-  }
-
-  @Override
-  public InProcessTransformResult finishBundle() {
-    fnRunner.finishBundle();
-    StepTransformResult.Builder resultBuilder;
-    CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
-    if (state != null) {
-      resultBuilder =
-          StepTransformResult.withHold(transform, state.getEarliestWatermarkHold())
-              .withState(state);
-    } else {
-      resultBuilder = StepTransformResult.withoutHold(transform);
-    }
-    return resultBuilder
-        .addOutput(outputBundles)
-        .withTimerUpdate(stepContext.getTimerUpdate())
-        .withCounters(counters)
-        .build();
-  }
-
-  static class BundleOutputManager implements OutputManager {
-    private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-    private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
-
-    public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
-      return new BundleOutputManager(outputBundles);
-    }
-
-    private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
-      this.bundles = bundles;
-      undeclaredOutputs = new HashMap<>();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      @SuppressWarnings("rawtypes")
-      UncommittedBundle bundle = bundles.get(tag);
-      if (bundle == null) {
-        List undeclaredContents = undeclaredOutputs.get(tag);
-        if (undeclaredContents == null) {
-          undeclaredContents = new ArrayList<T>();
-          undeclaredOutputs.put(tag, undeclaredContents);
-        }
-        undeclaredContents.add(output);
-      } else {
-        bundle.add(output);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
deleted file mode 100644
index 659bdd2..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link BoundMulti} primitive {@link PTransform}.
- */
-class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <T> TransformEvaluator<T> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createMultiEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private static <InT, OuT> ParDoInProcessEvaluator<InT> createMultiEvaluator(
-      AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application,
-      CommittedBundle<InT> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    PCollectionTuple output = application.getOutput();
-    Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
-    Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>();
-    for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
-      outputBundles.put(
-          outputEntry.getKey(),
-          evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
-    }
-    InProcessExecutionContext executionContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey());
-    String stepName = evaluationContext.getStepName(application);
-    InProcessStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName, null);
-
-    CounterSet counters = evaluationContext.createCounterSet();
-
-    DoFn<InT, OuT> fn = application.getTransform().getFn();
-    DoFnRunner<InT, OuT> runner =
-        DoFnRunners.createDefault(
-            evaluationContext.getPipelineOptions(),
-            fn,
-            evaluationContext.createSideInputReader(application.getTransform().getSideInputs()),
-            BundleOutputManager.create(outputBundles),
-            application.getTransform().getMainOutputTag(),
-            application.getTransform().getSideOutputTags().getAll(),
-            stepContext,
-            counters.getAddCounterMutator(),
-            application.getInput().getWindowingStrategy());
-
-    runner.startBundle();
-
-    return new ParDoInProcessEvaluator<>(
-        runner, application, counters, outputBundles.values(), stepContext);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
deleted file mode 100644
index e9bc1f7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo.Bound;
-import com.google.cloud.dataflow.sdk.util.DoFnRunner;
-import com.google.cloud.dataflow.sdk.util.DoFnRunners;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.TupleTag;
-
-import java.util.Collections;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound ParDo.Bound} primitive {@link PTransform}.
- */
-class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <T> TransformEvaluator<T> forApplication(
-      final AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createSingleEvaluator(
-            (AppliedPTransform) application, inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private static <InputT, OutputT> ParDoInProcessEvaluator<InputT> createSingleEvaluator(
-      @SuppressWarnings("rawtypes") AppliedPTransform<PCollection<InputT>, PCollection<OutputT>,
-          Bound<InputT, OutputT>> application,
-      CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) {
-    TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-    UncommittedBundle<OutputT> outputBundle =
-        evaluationContext.createBundle(inputBundle, application.getOutput());
-
-    InProcessExecutionContext executionContext =
-        evaluationContext.getExecutionContext(application, inputBundle.getKey());
-    String stepName = evaluationContext.getStepName(application);
-    InProcessStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName, null);
-
-    CounterSet counters = evaluationContext.createCounterSet();
-
-    DoFnRunner<InputT, OutputT> runner =
-        DoFnRunners.createDefault(
-            evaluationContext.getPipelineOptions(),
-            application.getTransform().getFn(),
-            evaluationContext.createSideInputReader(application.getTransform().getSideInputs()),
-            BundleOutputManager.create(
-                Collections.<TupleTag<?>, UncommittedBundle<?>>singletonMap(
-                    mainOutputTag, outputBundle)),
-            mainOutputTag,
-            Collections.<TupleTag<?>>emptyList(),
-            stepContext,
-            counters.getAddCounterMutator(),
-            application.getInput().getWindowingStrategy());
-
-    runner.startBundle();
-    return new ParDoInProcessEvaluator<InputT>(
-        runner,
-        application,
-        counters,
-        Collections.<UncommittedBundle<?>>singleton(outputBundle),
-        stepContext);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
deleted file mode 100644
index 1595572..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.common.base.MoreObjects;
-
-import java.util.Objects;
-
-/**
- * A (Step, Key) pair. This is useful as a map key or cache key for things that are available
- * per-step in a keyed manner (e.g. State).
- */
-final class StepAndKey {
-  private final AppliedPTransform<?, ?, ?> step;
-  private final Object key;
-
-  /**
-   * Create a new {@link StepAndKey} with the provided step and key.
-   */
-  public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
-    return new StepAndKey(step, key);
-  }
-
-  private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
-    this.step = step;
-    this.key = key;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(StepAndKey.class)
-        .add("step", step.getFullName())
-        .add("key", key)
-        .toString();
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(step, key);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == this) {
-      return true;
-    } else if (!(other instanceof StepAndKey)) {
-      return false;
-    } else {
-      StepAndKey that = (StepAndKey) other;
-      return Objects.equals(this.step, that.step)
-          && Objects.equals(this.key, that.key);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepTransformResult.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepTransformResult.java
deleted file mode 100644
index 3c4ee29..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepTransformResult.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.common.CounterSet;
-import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-/**
- * An immutable {@link InProcessTransformResult}.
- */
-public class StepTransformResult implements InProcessTransformResult {
-  private final AppliedPTransform<?, ?, ?> transform;
-  private final Iterable<? extends UncommittedBundle<?>> bundles;
-  private final CopyOnAccessInMemoryStateInternals<?> state;
-  private final TimerUpdate timerUpdate;
-  private final CounterSet counters;
-  private final Instant watermarkHold;
-
-  private StepTransformResult(
-      AppliedPTransform<?, ?, ?> transform,
-      Iterable<? extends UncommittedBundle<?>> outputBundles,
-      CopyOnAccessInMemoryStateInternals<?> state,
-      TimerUpdate timerUpdate,
-      CounterSet counters,
-      Instant watermarkHold) {
-    this.transform = transform;
-    this.bundles = outputBundles;
-    this.state = state;
-    this.timerUpdate = timerUpdate;
-    this.counters = counters;
-    this.watermarkHold = watermarkHold;
-  }
-
-  @Override
-  public Iterable<? extends UncommittedBundle<?>> getOutputBundles() {
-    return bundles;
-  }
-
-  @Override
-  public CounterSet getCounters() {
-    return counters;
-  }
-
-  @Override
-  public AppliedPTransform<?, ?, ?> getTransform() {
-    return transform;
-  }
-
-  @Override
-  public Instant getWatermarkHold() {
-    return watermarkHold;
-  }
-
-  @Override
-  public CopyOnAccessInMemoryStateInternals<?> getState() {
-    return state;
-  }
-
-  @Override
-  public TimerUpdate getTimerUpdate() {
-    return timerUpdate;
-  }
-
-  public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
-    return new Builder(transform, watermarkHold);
-  }
-
-  public static Builder withoutHold(AppliedPTransform<?, ?, ?> transform) {
-    return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(StepTransformResult.class)
-        .add("transform", transform)
-        .toString();
-  }
-
-  /**
-   * A builder for creating instances of {@link StepTransformResult}.
-   */
-  public static class Builder {
-    private final AppliedPTransform<?, ?, ?> transform;
-    private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
-    private CopyOnAccessInMemoryStateInternals<?> state;
-    private TimerUpdate timerUpdate;
-    private CounterSet counters;
-    private final Instant watermarkHold;
-
-    private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
-      this.transform = transform;
-      this.watermarkHold = watermarkHold;
-      this.bundlesBuilder = ImmutableList.builder();
-      this.timerUpdate = TimerUpdate.builder(null).build();
-    }
-
-    public StepTransformResult build() {
-      return new StepTransformResult(
-          transform,
-          bundlesBuilder.build(),
-          state,
-          timerUpdate,
-          counters,
-          watermarkHold);
-    }
-
-    public Builder withCounters(CounterSet counters) {
-      this.counters = counters;
-      return this;
-    }
-
-    public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
-      this.state = state;
-      return this;
-    }
-
-    public Builder withTimerUpdate(TimerUpdate timerUpdate) {
-      this.timerUpdate = timerUpdate;
-      return this;
-    }
-
-    public Builder addOutput(
-        UncommittedBundle<?> outputBundle, UncommittedBundle<?>... outputBundles) {
-      bundlesBuilder.add(outputBundle);
-      bundlesBuilder.add(outputBundles);
-      return this;
-    }
-
-    public Builder addOutput(Collection<UncommittedBundle<?>> outputBundles) {
-      bundlesBuilder.addAll(outputBundles);
-      return this;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluator.java
deleted file mode 100644
index 270557d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * An evaluator of a specific application of a transform. Will be used for at least one
- * {@link CommittedBundle}.
- *
- * @param <InputT> the type of elements that will be passed to {@link #processElement}
- */
-public interface TransformEvaluator<InputT> {
-  /**
-   * Process an element in the input {@link CommittedBundle}.
-   *
-   * @param element the element to process
-   */
-  void processElement(WindowedValue<InputT> element) throws Exception;
-
-  /**
-   * Finish processing the bundle of this {@link TransformEvaluator}.
-   *
-   * After {@link #finishBundle()} is called, the {@link TransformEvaluator} will not be reused,
-   * and no more elements will be processed.
-   *
-   * @return an {@link InProcessTransformResult} containing the results of this bundle evaluation.
-   */
-  InProcessTransformResult finishBundle() throws Exception;
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
deleted file mode 100644
index 860ddfe..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-
-import javax.annotation.Nullable;
-
-/**
- * A factory for creating instances of {@link TransformEvaluator} for the application of a
- * {@link PTransform}.
- */
-public interface TransformEvaluatorFactory {
-  /**
-   * Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
-   *
-   * Any work that must be done before input elements are processed (such as calling
-   * {@link DoFn#startBundle(DoFn.Context)}) must be done before the {@link TransformEvaluator} is
-   * made available to the caller.
-   *
-   * @throws Exception whenever constructing the underlying evaluator throws an exception
-   */
-  <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application, @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
deleted file mode 100644
index 0c8cb7e..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory}
- * implementations based on the type of {@link PTransform} of the application.
- */
-class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
-  public static TransformEvaluatorRegistry defaultRegistry() {
-    @SuppressWarnings("rawtypes")
-    ImmutableMap<Class<? extends PTransform>, TransformEvaluatorFactory> primitives =
-        ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder()
-            .put(Read.Bounded.class, new BoundedReadEvaluatorFactory())
-            .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
-            .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
-            .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
-            .put(
-                GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
-                new GroupByKeyEvaluatorFactory())
-            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
-            .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
-            .build();
-    return new TransformEvaluatorRegistry(primitives);
-  }
-
-  // the TransformEvaluatorFactories can construct instances of all generic types of transform,
-  // so all instances of a primitive can be handled with the same evaluator factory.
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories;
-
-  private TransformEvaluatorRegistry(
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, TransformEvaluatorFactory> factories) {
-    this.factories = factories;
-  }
-
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext)
-      throws Exception {
-    TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass());
-    return factory.forApplication(application, inputBundle, evaluationContext);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
deleted file mode 100644
index d630749..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.common.base.Throwables;
-
-import java.util.concurrent.Callable;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Callable} responsible for constructing a {@link TransformEvaluator} from a
- * {@link TransformEvaluatorFactory} and evaluating it on some bundle of input, and registering
- * the result using a registered {@link CompletionCallback}.
- *
- * <p>A {@link TransformExecutor} that is currently executing also provides access to the thread
- * that it is being executed on.
- */
-class TransformExecutor<T> implements Callable<InProcessTransformResult> {
-  public static <T> TransformExecutor<T> create(
-      TransformEvaluatorFactory factory,
-      InProcessEvaluationContext evaluationContext,
-      CommittedBundle<T> inputBundle,
-      AppliedPTransform<?, ?, ?> transform,
-      CompletionCallback completionCallback,
-      TransformExecutorService transformEvaluationState) {
-    return new TransformExecutor<>(
-        factory,
-        evaluationContext,
-        inputBundle,
-        transform,
-        completionCallback,
-        transformEvaluationState);
-  }
-
-  private final TransformEvaluatorFactory evaluatorFactory;
-  private final InProcessEvaluationContext evaluationContext;
-
-  /** The transform that will be evaluated. */
-  private final AppliedPTransform<?, ?, ?> transform;
-  /** The inputs this {@link TransformExecutor} will deliver to the transform. */
-  private final CommittedBundle<T> inputBundle;
-
-  private final CompletionCallback onComplete;
-  private final TransformExecutorService transformEvaluationState;
-
-  private Thread thread;
-
-  private TransformExecutor(
-      TransformEvaluatorFactory factory,
-      InProcessEvaluationContext evaluationContext,
-      CommittedBundle<T> inputBundle,
-      AppliedPTransform<?, ?, ?> transform,
-      CompletionCallback completionCallback,
-      TransformExecutorService transformEvaluationState) {
-    this.evaluatorFactory = factory;
-    this.evaluationContext = evaluationContext;
-
-    this.inputBundle = inputBundle;
-    this.transform = transform;
-
-    this.onComplete = completionCallback;
-
-    this.transformEvaluationState = transformEvaluationState;
-  }
-
-  @Override
-  public InProcessTransformResult call() {
-    this.thread = Thread.currentThread();
-    try {
-      TransformEvaluator<T> evaluator =
-          evaluatorFactory.forApplication(transform, inputBundle, evaluationContext);
-      if (inputBundle != null) {
-        for (WindowedValue<T> value : inputBundle.getElements()) {
-          evaluator.processElement(value);
-        }
-      }
-      InProcessTransformResult result = evaluator.finishBundle();
-      onComplete.handleResult(inputBundle, result);
-      return result;
-    } catch (Throwable t) {
-      onComplete.handleThrowable(inputBundle, t);
-      throw Throwables.propagate(t);
-    } finally {
-      this.thread = null;
-      transformEvaluationState.complete(this);
-    }
-  }
-
-  /**
-   * If this {@link TransformExecutor} is currently executing, return the thread it is executing in.
-   * Otherwise, return null.
-   */
-  @Nullable
-  public Thread getThread() {
-    return this.thread;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
deleted file mode 100644
index 3f00da6..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorService.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-/**
- * Schedules and completes {@link TransformExecutor TransformExecutors}, controlling concurrency as
- * appropriate for the {@link StepAndKey} the executor exists for.
- */
-interface TransformExecutorService {
-  /**
-   * Schedule the provided work to be eventually executed.
-   */
-  void schedule(TransformExecutor<?> work);
-
-  /**
-   * Finish executing the provided work. This may cause additional
-   * {@link TransformExecutor TransformExecutors} to be evaluated.
-   */
-  void complete(TransformExecutor<?> completed);
-}
-



Mime
View raw message