beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Remove StateSampler from the SDK
Date Thu, 21 Apr 2016 22:07:55 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0e420f16c -> c3567f1e7


Remove StateSampler from the SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6c7ed126
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6c7ed126
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6c7ed126

Branch: refs/heads/master
Commit: 6c7ed12631c7c4aba4e44fc0bd1b7f82f10b46ae
Parents: 0e420f1
Author: Scott Wegner <swegner@google.com>
Authored: Fri Mar 4 10:01:51 2016 -0800
Committer: Scott Wegner <swegner@google.com>
Committed: Thu Apr 21 10:05:50 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InProcessExecutionContext.java    |   4 +-
 .../inprocess/ParDoInProcessEvaluator.java      |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  21 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   2 +-
 .../beam/sdk/util/BaseExecutionContext.java     |  31 +-
 .../sdk/util/DirectModeExecutionContext.java    |   4 +-
 .../apache/beam/sdk/util/ExecutionContext.java  |   4 +-
 .../sdk/util/common/worker/StateSampler.java    | 367 -------------------
 .../InProcessEvaluationContextTest.java         |  16 +-
 .../sdk/util/GroupAlsoByWindowsProperties.java  |   2 +-
 10 files changed, 46 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java
index 1430c98..e6441cf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessExecutionContext.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformW
 import org.apache.beam.sdk.util.BaseExecutionContext;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.common.worker.StateSampler;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 
 /**
@@ -47,8 +46,7 @@ class InProcessExecutionContext
   }
 
   @Override
-  protected InProcessStepContext createStepContext(
-      String stepName, String transformName, StateSampler stateSampler) {
+  protected InProcessStepContext createStepContext(String stepName, String transformName)
{
     return new InProcessStepContext(this, stepName, transformName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index a2f080c..9840e06 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -54,7 +54,7 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T>
{
         evaluationContext.getExecutionContext(application, inputBundle.getKey());
     String stepName = evaluationContext.getStepName(application);
     InProcessStepContext stepContext =
-        executionContext.getOrCreateStepContext(stepName, stepName, null);
+        executionContext.getOrCreateStepContext(stepName, stepName);
 
     CounterSet counters = evaluationContext.createCounterSet();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 1c60259..16dc731 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -483,15 +483,16 @@ public class DoFnTester<InputT, OutputT> {
       runnerSideInputs = runnerSideInputs.and(entry.getKey().getTagInternal(), entry.getValue());
     }
     outputManager = new DoFnRunnerBase.ListOutputManager();
-    fnRunner = DoFnRunners.createDefault(
-        options,
-        fn,
-        DirectSideInputReader.of(runnerSideInputs),
-        outputManager,
-        mainOutputTag,
-        sideOutputTags,
-        DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME,
null),
-        counterSet.getAddCounterMutator(),
-        WindowingStrategy.globalDefault());
+    fnRunner =
+        DoFnRunners.createDefault(
+            options,
+            fn,
+            DirectSideInputReader.of(runnerSideInputs),
+            outputManager,
+            mainOutputTag,
+            sideOutputTags,
+            DirectModeExecutionContext.create().getOrCreateStepContext(STEP_NAME, TRANSFORM_NAME),
+            counterSet.getAddCounterMutator(),
+            WindowingStrategy.globalDefault());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index d266155..02464ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -1200,7 +1200,7 @@ public class ParDo {
             outputManager,
             mainOutputTag,
             sideOutputTags,
-            executionContext.getOrCreateStepContext(stepName, stepName, null),
+            executionContext.getOrCreateStepContext(stepName, stepName),
             context.getAddCounterMutator(),
             input.getWindowingStrategy());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
index 33df089..630417b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.common.worker.StateSampler;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.base.Supplier;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -37,7 +37,7 @@ import java.util.Map;
  * be cached for the lifetime of this {@link ExecutionContext}.
  *
  * <p>BaseExecutionContext is generic to allow implementing subclasses to return a
concrete subclass
- * of {@link StepContext} from {@link #getOrCreateStepContext(String, String, StateSampler)}
and
+ * of {@link StepContext} from {@link #getOrCreateStepContext(String, String)} and
  * {@link #getAllStepContexts()} without forcing each subclass to override the method, e.g.
  * <pre>
  * @Override
@@ -47,8 +47,8 @@ import java.util.Map;
  * </pre>
  *
  * <p>When a subclass of {@code BaseExecutionContext} has been downcast, the return
types of
- * {@link #createStepContext(String, String, StateSampler)},
- * {@link #getOrCreateStepContext(String, String, StateSampler}, and {@link #getAllStepContexts()}
+ * {@link #createStepContext(String, String)},
+ * {@link #getOrCreateStepContext(String, String)}, and {@link #getAllStepContexts()}
  * will be appropriately specialized.
  */
 public abstract class BaseExecutionContext<T extends ExecutionContext.StepContext>
@@ -60,21 +60,32 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
    * Implementations should override this to create the specific type
    * of {@link StepContext} they need.
    */
-  protected abstract T createStepContext(
-      String stepName, String transformName, StateSampler stateSampler);
-
+  protected abstract T createStepContext(String stepName, String transformName);
 
   /**
    * Returns the {@link StepContext} associated with the given step.
    */
   @Override
-  public T getOrCreateStepContext(
-      String stepName, String transformName, StateSampler stateSampler) {
+  public T getOrCreateStepContext(String stepName, String transformName) {
+    final String finalStepName = stepName;
+    final String finalTransformName = transformName;
+    return getOrCreateStepContext(
+        stepName,
+        new Supplier<T>() {
+          @Override
+          public T get() {
+            return createStepContext(finalStepName, finalTransformName);
+          }
+        });
+  }
+
+  protected final T getOrCreateStepContext(String stepName, Supplier<T> createContextFunc)
{
     T context = cachedStepContexts.get(stepName);
     if (context == null) {
-      context = createStepContext(stepName, transformName, stateSampler);
+      context = createContextFunc.get();
       cachedStepContexts.put(stepName, context);
     }
+
     return context;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
index c3da3d7..85e36dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.util;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import org.apache.beam.sdk.util.common.worker.StateSampler;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
@@ -48,8 +47,7 @@ public class DirectModeExecutionContext
   }
 
   @Override
-  protected StepContext createStepContext(
-      String stepName, String transformName, StateSampler stateSampler) {
+  protected StepContext createStepContext(String stepName, String transformName) {
     return new StepContext(this, stepName, transformName);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
index 577aa66..01bde82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.common.worker.StateSampler;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -34,8 +33,7 @@ public interface ExecutionContext {
   /**
    * Returns the {@link StepContext} associated with the given step.
    */
-  StepContext getOrCreateStepContext(
-      String stepName, String transformName, StateSampler stateSampler);
+  StepContext getOrCreateStepContext(String stepName, String transformName);
 
   /**
    * Returns a collection view of all of the {@link StepContext}s.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
deleted file mode 100644
index ee95260..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.common.worker;
-
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-/**
- * A StateSampler object may be used to obtain an approximate
- * breakdown of the time spent by an execution context in various
- * states, as a fraction of the total time.  The sampling is taken at
- * regular intervals, with adjustment for scheduling delay.
- */
-@ThreadSafe
-public class StateSampler implements AutoCloseable {
-
-  /** Different kinds of states. */
-  public enum StateKind {
-    /** IO, user code, etc. */
-    USER,
-    /** Reading/writing from/to shuffle service, etc. */
-    FRAMEWORK
-  }
-
-  public static final long DEFAULT_SAMPLING_PERIOD_MS = 200;
-
-  private final String prefix;
-  private final CounterSet.AddCounterMutator counterSetMutator;
-
-  /** Array of counters indexed by their state. */
-  private ArrayList<Counter<Long>> countersByState = new ArrayList<>();
-
-  /** Map of state name to state. */
-  private Map<String, Integer> statesByName = new HashMap<>();
-
-  /** Map of state id to kind. */
-  private Map<Integer, StateKind> kindsByState = new HashMap<>();
-
-  /** The current state. */
-  private volatile int currentState;
-
-  /** Special value of {@code currentState} that means we do not sample. */
-  public static final int DO_NOT_SAMPLE = -1;
-
-  /**
-   * A counter that increments with each state transition. May be used
-   * to detect a context being stuck in a state for some amount of
-   * time.
-   */
-  private volatile long stateTransitionCount;
-
-  /**
-   * The timestamp (in nanoseconds) corresponding to the last time the
-   * state was sampled (and recorded).
-   */
-  private long stateTimestampNs = 0;
-
-  /** Using a fixed number of timers for all StateSampler objects. */
-  private static final int NUM_EXECUTOR_THREADS = 16;
-
-  private static final ScheduledExecutorService executorService =
-      Executors.newScheduledThreadPool(NUM_EXECUTOR_THREADS,
-          new ThreadFactoryBuilder().setDaemon(true).build());
-
-  private Random rand = new Random();
-
-  private List<SamplingCallback> callbacks = new ArrayList<>();
-
-  private ScheduledFuture<?> invocationTriggerFuture = null;
-
-  private ScheduledFuture<?> invocationFuture = null;
-
-  /**
-   * Constructs a new {@link StateSampler} that can be used to obtain
-   * an approximate breakdown of the time spent by an execution
-   * context in various states, as a fraction of the total time.
-   *
-   * @param prefix the prefix of the counter names for the states
-   * @param counterSetMutator the {@link CounterSet.AddCounterMutator}
-   * used to create a counter for each distinct state
-   * @param samplingPeriodMs the sampling period in milliseconds
-   */
-  public StateSampler(String prefix,
-                      CounterSet.AddCounterMutator counterSetMutator,
-                      final long samplingPeriodMs) {
-    this.prefix = prefix;
-    this.counterSetMutator = counterSetMutator;
-    currentState = DO_NOT_SAMPLE;
-    scheduleSampling(samplingPeriodMs);
-  }
-
-  /**
-   * Constructs a new {@link StateSampler} that can be used to obtain
-   * an approximate breakdown of the time spent by an execution
-   * context in various states, as a fraction of the total time.
-   *
-   * @param prefix the prefix of the counter names for the states
-   * @param counterSetMutator the {@link CounterSet.AddCounterMutator}
-   * used to create a counter for each distinct state
-   */
-  public StateSampler(String prefix,
-                      CounterSet.AddCounterMutator counterSetMutator) {
-    this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS);
-  }
-
-  /**
-   * Called by the constructor to schedule sampling at the given period.
-   *
-   * <p>Should not be overridden by sub-classes unless they want to change
-   * or disable the automatic sampling of state.
-   */
-  protected void scheduleSampling(final long samplingPeriodMs) {
-    // Here "stratified sampling" is used, which makes sure that there's 1 uniformly chosen
sampled
-    // point in every bucket of samplingPeriodMs, to prevent pathological behavior in case
some
-    // states happen to occur at a similar period.
-    // The current implementation uses a fixed-rate timer with a period samplingPeriodMs
as a
-    // trampoline to a one-shot random timer which fires with a random delay within
-    // samplingPeriodMs.
-    stateTimestampNs = System.nanoTime();
-    invocationTriggerFuture =
-        executorService.scheduleAtFixedRate(
-            new Runnable() {
-              @Override
-              public void run() {
-                long delay = rand.nextInt((int) samplingPeriodMs);
-                synchronized (StateSampler.this) {
-                  if (invocationFuture != null) {
-                    invocationFuture.cancel(false);
-                  }
-                  invocationFuture =
-                      executorService.schedule(
-                          new Runnable() {
-                            @Override
-                            public void run() {
-                              StateSampler.this.run();
-                            }
-                          },
-                          delay,
-                          TimeUnit.MILLISECONDS);
-                }
-              }
-            },
-            0,
-            samplingPeriodMs,
-            TimeUnit.MILLISECONDS);
-  }
-
-  public synchronized void run() {
-    long startTimestampNs = System.nanoTime();
-    int state = currentState;
-    if (state != DO_NOT_SAMPLE) {
-      StateKind kind = null;
-      long elapsedMs = TimeUnit.NANOSECONDS.toMillis(startTimestampNs - stateTimestampNs);
-      kind = kindsByState.get(state);
-      countersByState.get(state).addValue(elapsedMs);
-      // Invoke all callbacks.
-      for (SamplingCallback c : callbacks) {
-        c.run(state, kind, elapsedMs);
-      }
-    }
-    stateTimestampNs = startTimestampNs;
-  }
-
-  @Override
-  public synchronized void close() {
-    currentState = DO_NOT_SAMPLE;
-    if (invocationTriggerFuture != null) {
-      invocationTriggerFuture.cancel(false);
-    }
-    if (invocationFuture != null) {
-      invocationFuture.cancel(false);
-    }
-  }
-
-  /**
-   * Returns the state associated with a name; creating a new state if
-   * necessary. Using states instead of state names during state
-   * transitions is done for efficiency.
-   *
-   * @name the name for the state
-   * @kind kind of the state, see {#code StateKind}
-   * @return the state associated with the state name
-   */
-  public int stateForName(String name, StateKind kind) {
-    if (name.isEmpty()) {
-      return DO_NOT_SAMPLE;
-    }
-
-    synchronized (this) {
-      Integer state = statesByName.get(name);
-      if (state == null) {
-        String counterName = prefix + name + "-msecs";
-        Counter<Long> counter = counterSetMutator.addCounter(
-            Counter.longs(counterName, Counter.AggregationKind.SUM));
-        state = countersByState.size();
-        statesByName.put(name, state);
-        countersByState.add(counter);
-        kindsByState.put(state, kind);
-      }
-      StateKind originalKind = kindsByState.get(state);
-      if (originalKind != kind) {
-        throw new IllegalArgumentException(
-            "for state named " + name
-            + ", requested kind " + kind + " different from the original kind " + originalKind);
-      }
-      return state;
-    }
-  }
-
-  /**
-   * An internal class for representing StateSampler information
-   * typically used for debugging.
-   */
-  public static class StateSamplerInfo {
-    public final String state;
-    public final Long transitionCount;
-    public final Long stateDurationMillis;
-
-    public StateSamplerInfo(String state, Long transitionCount,
-                            Long stateDurationMillis) {
-      this.state = state;
-      this.transitionCount = transitionCount;
-      this.stateDurationMillis = stateDurationMillis;
-    }
-  }
-
-  /**
-   * Returns information about the current state of this state sampler
-   * into a {@link StateSamplerInfo} object, or null if sampling is
-   * not turned on.
-   *
-   * @return information about this state sampler or null if sampling is off
-   */
-  public synchronized StateSamplerInfo getInfo() {
-    return currentState == DO_NOT_SAMPLE ? null
-        : new StateSamplerInfo(countersByState.get(currentState).getFlatName(),
-            stateTransitionCount, null);
-  }
-
-  /**
-   * Returns the current state of this state sampler.
-   */
-  public int getCurrentState() {
-    return currentState;
-  }
-
-  /**
-   * Sets the current thread state.
-   *
-   * @param state the new state to transition to
-   * @return the previous state
-   */
-  public int setState(int state) {
-    // Updates to stateTransitionCount are always done by the same
-    // thread, making the non-atomic volatile update below safe. The
-    // count is updated first to avoid incorrectly attributing
-    // stuckness occuring in an old state to the new state.
-    long previousStateTransitionCount = this.stateTransitionCount;
-    this.stateTransitionCount = previousStateTransitionCount + 1;
-    int previousState = currentState;
-    currentState = state;
-    return previousState;
-  }
-
-  /**
-   * Sets the current thread state.
-   *
-   * @param name the name of the new state to transition to
-   * @param kind kind of the new state
-   * @return the previous state
-   */
-  public int setState(String name, StateKind kind) {
-    return setState(stateForName(name, kind));
-  }
-
-  /**
-   * Returns an AutoCloseable {@link ScopedState} that will perform a
-   * state transition to the given state, and will automatically reset
-   * the state to the prior state upon closing.
-   *
-   * @param state the new state to transition to
-   * @return a {@link ScopedState} that automatically resets the state
-   * to the prior state
-   */
-  public ScopedState scopedState(int state) {
-    return new ScopedState(this, setState(state));
-  }
-
-  /**
-   * Add a callback to the sampler.
-   * The callbacks will be executed sequentially upon {@link StateSampler#run}.
-   */
-  public synchronized void addSamplingCallback(SamplingCallback callback) {
-    callbacks.add(callback);
-  }
-
-  /** Get the counter prefix associated with this sampler. */
-  public String getPrefix() {
-    return prefix;
-  }
-
-  /**
-   * A nested class that is used to account for states and state
-   * transitions based on lexical scopes.
-   *
-   * <p>Thread-safe.
-   */
-  public class ScopedState implements AutoCloseable {
-    private StateSampler sampler;
-    private int previousState;
-
-    private ScopedState(StateSampler sampler, int previousState) {
-      this.sampler = sampler;
-      this.previousState = previousState;
-    }
-
-    @Override
-    public void close() {
-      sampler.setState(previousState);
-    }
-  }
-
-  /**
-   * Callbacks which supposed to be called sequentially upon {@link StateSampler#run}.
-   * They should be registered via {@link #addSamplingCallback}.
-   */
-  public static interface SamplingCallback {
-    /**
-     * The entrance method of the callback, it is called in {@link StateSampler#run},
-     * once per sample. This method should be thread safe.
-     *
-     * @param state The state of the StateSampler at the time of sample.
-     * @param kind The kind associated with the state, see {@link StateKind}.
-     * @param elapsedMs Milliseconds since last sample.
-     */
-    public void run(int state, StateKind kind, long elapsedMs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index 50b83fd..ee56954 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -160,7 +160,7 @@ public class InProcessEvaluationContextTest {
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
-    InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1", null);
+    InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
     context.handleResult(
@@ -176,7 +176,7 @@ public class InProcessEvaluationContextTest {
         context.getExecutionContext(created.getProducingTransformInternal(), "foo");
     assertThat(
         secondFooContext
-            .getOrCreateStepContext("s1", "s1", null)
+            .getOrCreateStepContext("s1", "s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -192,7 +192,7 @@ public class InProcessEvaluationContextTest {
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getOrCreateStepContext("s1", "s1", null)
+        .getOrCreateStepContext("s1", "s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -202,7 +202,7 @@ public class InProcessEvaluationContextTest {
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
         barContext
-            .getOrCreateStepContext("s1", "s1", null)
+            .getOrCreateStepContext("s1", "s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -218,7 +218,7 @@ public class InProcessEvaluationContextTest {
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     fooContext
-        .getOrCreateStepContext("s1", "s1", null)
+        .getOrCreateStepContext("s1", "s1")
         .stateInternals()
         .state(StateNamespaces.global(), intBag)
         .add(1);
@@ -227,7 +227,7 @@ public class InProcessEvaluationContextTest {
         context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
     assertThat(
         barContext
-            .getOrCreateStepContext("s1", "s1", null)
+            .getOrCreateStepContext("s1", "s1")
             .stateInternals()
             .state(StateNamespaces.global(), intBag)
             .read(),
@@ -273,7 +273,7 @@ public class InProcessEvaluationContextTest {
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     CopyOnAccessInMemoryStateInternals<Object> state =
-        fooContext.getOrCreateStepContext("s1", "s1", null).stateInternals();
+        fooContext.getOrCreateStepContext("s1", "s1").stateInternals();
     BagState<Integer> bag = state.state(StateNamespaces.global(), intBag);
     bag.add(1);
     bag.add(2);
@@ -293,7 +293,7 @@ public class InProcessEvaluationContextTest {
         context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
 
     CopyOnAccessInMemoryStateInternals<Object> afterResultState =
-        afterResultContext.getOrCreateStepContext("s1", "s1", null).stateInternals();
+        afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
     assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1,
2, 4));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6c7ed126/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index d21edd1..d5aa0da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -708,7 +708,7 @@ public class GroupAlsoByWindowsProperties {
         outputManager,
         outputTag,
         new ArrayList<TupleTag<?>>(),
-        executionContext.getOrCreateStepContext("GABWStep", "GABWTransform", null),
+        executionContext.getOrCreateStepContext("GABWStep", "GABWTransform"),
         counters.getAddCounterMutator(),
         windowingStrategy);
   }


Mime
View raw message