Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 66416195DA for ; Thu, 24 Mar 2016 02:47:37 +0000 (UTC) Received: (qmail 4327 invoked by uid 500); 24 Mar 2016 02:47:37 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 4273 invoked by uid 500); 24 Mar 2016 02:47:37 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 4264 invoked by uid 99); 24 Mar 2016 02:47:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:47:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 5B1F4C1EDB for ; Thu, 24 Mar 2016 02:47:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.23 X-Spam-Level: X-Spam-Status: No, score=-3.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 6JdYUZPudv0o for ; Thu, 24 Mar 2016 02:47:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 038E15F33E for ; Thu, 24 Mar 2016 02:47:25 +0000 (UTC) Received: (qmail 3834 invoked by uid 99); 24 Mar 2016 02:47:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:47:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B5A2DFB79; Thu, 24 Mar 2016 02:47:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Thu, 24 Mar 2016 02:47:29 -0000 Message-Id: <967fb7288c3a4997b440a0900428ed34@git.apache.org> In-Reply-To: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> References: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/67] [partial] incubator-beam git commit: Directory reorganization http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java deleted file mode 100644 index 00d3b3b..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/StateSampler.java +++ /dev/null @@ -1,365 +0,0 @@ -/******************************************************************************* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - ******************************************************************************/ - -package com.google.cloud.dataflow.sdk.util.common.worker; - -import com.google.cloud.dataflow.sdk.util.common.Counter; -import com.google.cloud.dataflow.sdk.util.common.CounterSet; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.ThreadSafe; - -/** - * A StateSampler object may be used to obtain an approximate - * breakdown of the time spent by an execution context in various - * states, as a fraction of the total time. The sampling is taken at - * regular intervals, with adjustment for scheduling delay. - */ -@ThreadSafe -public class StateSampler implements AutoCloseable { - - /** Different kinds of states. */ - public enum StateKind { - /** IO, user code, etc. */ - USER, - /** Reading/writing from/to shuffle service, etc. */ - FRAMEWORK - } - - public static final long DEFAULT_SAMPLING_PERIOD_MS = 200; - - private final String prefix; - private final CounterSet.AddCounterMutator counterSetMutator; - - /** Array of counters indexed by their state. */ - private ArrayList> countersByState = new ArrayList<>(); - - /** Map of state name to state. */ - private Map statesByName = new HashMap<>(); - - /** Map of state id to kind. */ - private Map kindsByState = new HashMap<>(); - - /** The current state. */ - private volatile int currentState; - - /** Special value of {@code currentState} that means we do not sample. */ - public static final int DO_NOT_SAMPLE = -1; - - /** - * A counter that increments with each state transition. May be used - * to detect a context being stuck in a state for some amount of - * time. - */ - private volatile long stateTransitionCount; - - /** - * The timestamp (in nanoseconds) corresponding to the last time the - * state was sampled (and recorded). - */ - private long stateTimestampNs = 0; - - /** Using a fixed number of timers for all StateSampler objects. */ - private static final int NUM_EXECUTOR_THREADS = 16; - - private static final ScheduledExecutorService executorService = - Executors.newScheduledThreadPool(NUM_EXECUTOR_THREADS, - new ThreadFactoryBuilder().setDaemon(true).build()); - - private Random rand = new Random(); - - private List callbacks = new ArrayList<>(); - - private ScheduledFuture invocationTriggerFuture = null; - - private ScheduledFuture invocationFuture = null; - - /** - * Constructs a new {@link StateSampler} that can be used to obtain - * an approximate breakdown of the time spent by an execution - * context in various states, as a fraction of the total time. - * - * @param prefix the prefix of the counter names for the states - * @param counterSetMutator the {@link CounterSet.AddCounterMutator} - * used to create a counter for each distinct state - * @param samplingPeriodMs the sampling period in milliseconds - */ - public StateSampler(String prefix, - CounterSet.AddCounterMutator counterSetMutator, - final long samplingPeriodMs) { - this.prefix = prefix; - this.counterSetMutator = counterSetMutator; - currentState = DO_NOT_SAMPLE; - scheduleSampling(samplingPeriodMs); - } - - /** - * Constructs a new {@link StateSampler} that can be used to obtain - * an approximate breakdown of the time spent by an execution - * context in various states, as a fraction of the total time. - * - * @param prefix the prefix of the counter names for the states - * @param counterSetMutator the {@link CounterSet.AddCounterMutator} - * used to create a counter for each distinct state - */ - public StateSampler(String prefix, - CounterSet.AddCounterMutator counterSetMutator) { - this(prefix, counterSetMutator, DEFAULT_SAMPLING_PERIOD_MS); - } - - /** - * Called by the constructor to schedule sampling at the given period. - * - *

Should not be overridden by sub-classes unless they want to change - * or disable the automatic sampling of state. - */ - protected void scheduleSampling(final long samplingPeriodMs) { - // Here "stratified sampling" is used, which makes sure that there's 1 uniformly chosen sampled - // point in every bucket of samplingPeriodMs, to prevent pathological behavior in case some - // states happen to occur at a similar period. - // The current implementation uses a fixed-rate timer with a period samplingPeriodMs as a - // trampoline to a one-shot random timer which fires with a random delay within - // samplingPeriodMs. - stateTimestampNs = System.nanoTime(); - invocationTriggerFuture = - executorService.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - long delay = rand.nextInt((int) samplingPeriodMs); - synchronized (StateSampler.this) { - if (invocationFuture != null) { - invocationFuture.cancel(false); - } - invocationFuture = - executorService.schedule( - new Runnable() { - @Override - public void run() { - StateSampler.this.run(); - } - }, - delay, - TimeUnit.MILLISECONDS); - } - } - }, - 0, - samplingPeriodMs, - TimeUnit.MILLISECONDS); - } - - public synchronized void run() { - long startTimestampNs = System.nanoTime(); - int state = currentState; - if (state != DO_NOT_SAMPLE) { - StateKind kind = null; - long elapsedMs = TimeUnit.NANOSECONDS.toMillis(startTimestampNs - stateTimestampNs); - kind = kindsByState.get(state); - countersByState.get(state).addValue(elapsedMs); - // Invoke all callbacks. - for (SamplingCallback c : callbacks) { - c.run(state, kind, elapsedMs); - } - } - stateTimestampNs = startTimestampNs; - } - - @Override - public synchronized void close() { - currentState = DO_NOT_SAMPLE; - if (invocationTriggerFuture != null) { - invocationTriggerFuture.cancel(false); - } - if (invocationFuture != null) { - invocationFuture.cancel(false); - } - } - - /** - * Returns the state associated with a name; creating a new state if - * necessary. Using states instead of state names during state - * transitions is done for efficiency. - * - * @name the name for the state - * @kind kind of the state, see {#code StateKind} - * @return the state associated with the state name - */ - public int stateForName(String name, StateKind kind) { - if (name.isEmpty()) { - return DO_NOT_SAMPLE; - } - - synchronized (this) { - Integer state = statesByName.get(name); - if (state == null) { - String counterName = prefix + name + "-msecs"; - Counter counter = counterSetMutator.addCounter( - Counter.longs(counterName, Counter.AggregationKind.SUM)); - state = countersByState.size(); - statesByName.put(name, state); - countersByState.add(counter); - kindsByState.put(state, kind); - } - StateKind originalKind = kindsByState.get(state); - if (originalKind != kind) { - throw new IllegalArgumentException( - "for state named " + name - + ", requested kind " + kind + " different from the original kind " + originalKind); - } - return state; - } - } - - /** - * An internal class for representing StateSampler information - * typically used for debugging. - */ - public static class StateSamplerInfo { - public final String state; - public final Long transitionCount; - public final Long stateDurationMillis; - - public StateSamplerInfo(String state, Long transitionCount, - Long stateDurationMillis) { - this.state = state; - this.transitionCount = transitionCount; - this.stateDurationMillis = stateDurationMillis; - } - } - - /** - * Returns information about the current state of this state sampler - * into a {@link StateSamplerInfo} object, or null if sampling is - * not turned on. - * - * @return information about this state sampler or null if sampling is off - */ - public synchronized StateSamplerInfo getInfo() { - return currentState == DO_NOT_SAMPLE ? null - : new StateSamplerInfo(countersByState.get(currentState).getName(), - stateTransitionCount, null); - } - - /** - * Returns the current state of this state sampler. - */ - public int getCurrentState() { - return currentState; - } - - /** - * Sets the current thread state. - * - * @param state the new state to transition to - * @return the previous state - */ - public int setState(int state) { - // Updates to stateTransitionCount are always done by the same - // thread, making the non-atomic volatile update below safe. The - // count is updated first to avoid incorrectly attributing - // stuckness occuring in an old state to the new state. - long previousStateTransitionCount = this.stateTransitionCount; - this.stateTransitionCount = previousStateTransitionCount + 1; - int previousState = currentState; - currentState = state; - return previousState; - } - - /** - * Sets the current thread state. - * - * @param name the name of the new state to transition to - * @param kind kind of the new state - * @return the previous state - */ - public int setState(String name, StateKind kind) { - return setState(stateForName(name, kind)); - } - - /** - * Returns an AutoCloseable {@link ScopedState} that will perform a - * state transition to the given state, and will automatically reset - * the state to the prior state upon closing. - * - * @param state the new state to transition to - * @return a {@link ScopedState} that automatically resets the state - * to the prior state - */ - public ScopedState scopedState(int state) { - return new ScopedState(this, setState(state)); - } - - /** - * Add a callback to the sampler. - * The callbacks will be executed sequentially upon {@link StateSampler#run}. - */ - public synchronized void addSamplingCallback(SamplingCallback callback) { - callbacks.add(callback); - } - - /** Get the counter prefix associated with this sampler. */ - public String getPrefix() { - return prefix; - } - - /** - * A nested class that is used to account for states and state - * transitions based on lexical scopes. - * - *

Thread-safe. - */ - public class ScopedState implements AutoCloseable { - private StateSampler sampler; - private int previousState; - - private ScopedState(StateSampler sampler, int previousState) { - this.sampler = sampler; - this.previousState = previousState; - } - - @Override - public void close() { - sampler.setState(previousState); - } - } - - /** - * Callbacks which supposed to be called sequentially upon {@link StateSampler#run}. - * They should be registered via {@link #addSamplingCallback}. - */ - public static interface SamplingCallback { - /** - * The entrance method of the callback, it is called in {@link StateSampler#run}, - * once per sample. This method should be thread safe. - * - * @param state The state of the StateSampler at the time of sample. - * @param kind The kind associated with the state, see {@link StateKind}. - * @param elapsedMs Milliseconds since last sample. - */ - public void run(int state, StateKind kind, long elapsedMs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java deleted file mode 100644 index c3da9ed..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/common/worker/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -/** Defines utilities used to implement the harness that runs user code. **/ -package com.google.cloud.dataflow.sdk.util.common.worker; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java deleted file mode 100644 index f72ba4c..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/GcsPath.java +++ /dev/null @@ -1,619 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util.gcsfs; - -import com.google.api.services.storage.model.StorageObject; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.file.FileSystem; -import java.nio.file.LinkOption; -import java.nio.file.Path; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.Iterator; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -/** - * Implements the Java NIO {@link Path} API for Google Cloud Storage paths. - * - *

GcsPath uses a slash ('/') as a directory separator. Below is - * a summary of how slashes are treated: - *

    - *
  • A GCS bucket may not contain a slash. An object may contain zero or - * more slashes. - *
  • A trailing slash always indicates a directory, which is compliant - * with POSIX.1-2008. - *
  • Slashes separate components of a path. Empty components are allowed, - * these are represented as repeated slashes. An empty component always - * refers to a directory, and always ends in a slash. - *
  • {@link #getParent()}} always returns a path ending in a slash, as the - * parent of a GcsPath is always a directory. - *
  • Use {@link #resolve(String)} to append elements to a GcsPath -- this - * applies the rules consistently and is highly recommended over any - * custom string concatenation. - *
- * - *

GcsPath treats all GCS objects and buckets as belonging to the same - * filesystem, so the root of a GcsPath is the GcsPath bucket="", object="". - * - *

Relative paths are not associated with any bucket. This matches common - * treatment of Path in which relative paths can be constructed from one - * filesystem and appended to another filesystem. - * - * @see Java Tutorials: Path Operations - */ -public class GcsPath implements Path { - - public static final String SCHEME = "gs"; - - /** - * Creates a GcsPath from a URI. - * - *

The URI must be in the form {@code gs://[bucket]/[path]}, and may not - * contain a port, user info, a query, or a fragment. - */ - public static GcsPath fromUri(URI uri) { - Preconditions.checkArgument(uri.getScheme().equalsIgnoreCase(SCHEME), - "URI: %s is not a GCS URI", uri); - Preconditions.checkArgument(uri.getPort() == -1, - "GCS URI may not specify port: %s (%i)", uri, uri.getPort()); - Preconditions.checkArgument( - Strings.isNullOrEmpty(uri.getUserInfo()), - "GCS URI may not specify userInfo: %s (%s)", uri, uri.getUserInfo()); - Preconditions.checkArgument( - Strings.isNullOrEmpty(uri.getQuery()), - "GCS URI may not specify query: %s (%s)", uri, uri.getQuery()); - Preconditions.checkArgument( - Strings.isNullOrEmpty(uri.getFragment()), - "GCS URI may not specify fragment: %s (%s)", uri, uri.getFragment()); - - return fromUri(uri.toString()); - } - - /** - * Pattern that is used to parse a GCS URL. - * - *

This is used to separate the components. Verification is handled - * separately. - */ - public static final Pattern GCS_URI = - Pattern.compile("(?[^:]+)://(?[^/]+)(/(?.*))?"); - - /** - * Creates a GcsPath from a URI in string form. - * - *

This does not use URI parsing, which means it may accept patterns that - * the URI parser would not accept. - */ - public static GcsPath fromUri(String uri) { - Matcher m = GCS_URI.matcher(uri); - Preconditions.checkArgument(m.matches(), "Invalid GCS URI: %s", uri); - - Preconditions.checkArgument(m.group("SCHEME").equalsIgnoreCase(SCHEME), - "URI: %s is not a GCS URI", uri); - return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT")); - } - - /** - * Pattern that is used to parse a GCS resource name. - */ - private static final Pattern GCS_RESOURCE_NAME = - Pattern.compile("storage.googleapis.com/(?[^/]+)(/(?.*))?"); - - /** - * Creates a GcsPath from a OnePlatform resource name in string form. - */ - public static GcsPath fromResourceName(String name) { - Matcher m = GCS_RESOURCE_NAME.matcher(name); - Preconditions.checkArgument(m.matches(), "Invalid GCS resource name: %s", name); - - return new GcsPath(null, m.group("BUCKET"), m.group("OBJECT")); - } - - /** - * Creates a GcsPath from a {@linkplain StorageObject}. - */ - public static GcsPath fromObject(StorageObject object) { - return new GcsPath(null, object.getBucket(), object.getName()); - } - - /** - * Creates a GcsPath from bucket and object components. - * - *

A GcsPath without a bucket name is treated as a relative path, which - * is a path component with no linkage to the root element. This is similar - * to a Unix path that does not begin with the root marker (a slash). - * GCS has different naming constraints and APIs for working with buckets and - * objects, so these two concepts are kept separate to avoid accidental - * attempts to treat objects as buckets, or vice versa, as much as possible. - * - *

A GcsPath without an object name is a bucket reference. - * A bucket is always a directory, which could be used to lookup or add - * files to a bucket, but could not be opened as a file. - * - *

A GcsPath containing neither bucket or object names is treated as - * the root of the GCS filesystem. A listing on the root element would return - * the buckets available to the user. - * - *

If {@code null} is passed as either parameter, it is converted to an - * empty string internally for consistency. There is no distinction between - * an empty string and a {@code null}, as neither are allowed by GCS. - * - * @param bucket a GCS bucket name, or none ({@code null} or an empty string) - * if the object is not associated with a bucket - * (e.g. relative paths or the root node). - * @param object a GCS object path, or none ({@code null} or an empty string) - * for no object. - */ - public static GcsPath fromComponents(@Nullable String bucket, - @Nullable String object) { - return new GcsPath(null, bucket, object); - } - - @Nullable - private FileSystem fs; - @Nonnull - private final String bucket; - @Nonnull - private final String object; - - /** - * Constructs a GcsPath. - * - * @param fs the associated FileSystem, if any - * @param bucket the associated bucket, or none ({@code null} or an empty - * string) for a relative path component - * @param object the object, which is a fully-qualified object name if bucket - * was also provided, or none ({@code null} or an empty string) - * for no object - * @throws java.lang.IllegalArgumentException if the bucket of object names - * are invalid. - */ - public GcsPath(@Nullable FileSystem fs, - @Nullable String bucket, - @Nullable String object) { - if (bucket == null) { - bucket = ""; - } - Preconditions.checkArgument(!bucket.contains("/"), - "GCS bucket may not contain a slash"); - Preconditions - .checkArgument(bucket.isEmpty() - || bucket.matches("[a-z0-9][-_a-z0-9.]+[a-z0-9]"), - "GCS bucket names must contain only lowercase letters, numbers, " - + "dashes (-), underscores (_), and dots (.). Bucket names " - + "must start and end with a number or letter. " - + "See https://developers.google.com/storage/docs/bucketnaming " - + "for more details. Bucket name: " + bucket); - - if (object == null) { - object = ""; - } - Preconditions.checkArgument( - object.indexOf('\n') < 0 && object.indexOf('\r') < 0, - "GCS object names must not contain Carriage Return or " - + "Line Feed characters."); - - this.fs = fs; - this.bucket = bucket; - this.object = object; - } - - /** - * Returns the bucket name associated with this GCS path, or an empty string - * if this is a relative path component. - */ - public String getBucket() { - return bucket; - } - - /** - * Returns the object name associated with this GCS path, or an empty string - * if no object is specified. - */ - public String getObject() { - return object; - } - - public void setFileSystem(FileSystem fs) { - this.fs = fs; - } - - @Override - public FileSystem getFileSystem() { - return fs; - } - - // Absolute paths are those that have a bucket and the root path. - @Override - public boolean isAbsolute() { - return !bucket.isEmpty() || object.isEmpty(); - } - - @Override - public GcsPath getRoot() { - return new GcsPath(fs, "", ""); - } - - @Override - public GcsPath getFileName() { - throw new UnsupportedOperationException(); - } - - /** - * Returns the parent path, or {@code null} if this path does not - * have a parent. - * - *

Returns a path that ends in '/', as the parent path always refers to - * a directory. - */ - @Override - public GcsPath getParent() { - if (bucket.isEmpty() && object.isEmpty()) { - // The root path has no parent, by definition. - return null; - } - - if (object.isEmpty()) { - // A GCS bucket. All buckets come from a common root. - return getRoot(); - } - - // Skip last character, in case it is a trailing slash. - int i = object.lastIndexOf('/', object.length() - 2); - if (i <= 0) { - if (bucket.isEmpty()) { - // Relative paths are not attached to the root node. - return null; - } - return new GcsPath(fs, bucket, ""); - } - - // Retain trailing slash. - return new GcsPath(fs, bucket, object.substring(0, i + 1)); - } - - @Override - public int getNameCount() { - int count = bucket.isEmpty() ? 0 : 1; - if (object.isEmpty()) { - return count; - } - - // Add another for each separator found. - int index = -1; - while ((index = object.indexOf('/', index + 1)) != -1) { - count++; - } - - return object.endsWith("/") ? count : count + 1; - } - - @Override - public GcsPath getName(int count) { - Preconditions.checkArgument(count >= 0); - - Iterator iterator = iterator(); - for (int i = 0; i < count; ++i) { - Preconditions.checkArgument(iterator.hasNext()); - iterator.next(); - } - - Preconditions.checkArgument(iterator.hasNext()); - return (GcsPath) iterator.next(); - } - - @Override - public GcsPath subpath(int beginIndex, int endIndex) { - Preconditions.checkArgument(beginIndex >= 0); - Preconditions.checkArgument(endIndex > beginIndex); - - Iterator iterator = iterator(); - for (int i = 0; i < beginIndex; ++i) { - Preconditions.checkArgument(iterator.hasNext()); - iterator.next(); - } - - GcsPath path = null; - while (beginIndex < endIndex) { - Preconditions.checkArgument(iterator.hasNext()); - if (path == null) { - path = (GcsPath) iterator.next(); - } else { - path = path.resolve(iterator.next()); - } - ++beginIndex; - } - - return path; - } - - @Override - public boolean startsWith(Path other) { - if (other instanceof GcsPath) { - GcsPath gcsPath = (GcsPath) other; - return startsWith(gcsPath.bucketAndObject()); - } else { - return startsWith(other.toString()); - } - } - - @Override - public boolean startsWith(String prefix) { - return bucketAndObject().startsWith(prefix); - } - - @Override - public boolean endsWith(Path other) { - if (other instanceof GcsPath) { - GcsPath gcsPath = (GcsPath) other; - return endsWith(gcsPath.bucketAndObject()); - } else { - return endsWith(other.toString()); - } - } - - @Override - public boolean endsWith(String suffix) { - return bucketAndObject().endsWith(suffix); - } - - // TODO: support "." and ".." path components? - @Override - public GcsPath normalize() { - return this; - } - - @Override - public GcsPath resolve(Path other) { - if (other instanceof GcsPath) { - GcsPath path = (GcsPath) other; - if (path.isAbsolute()) { - return path; - } else { - return resolve(path.getObject()); - } - } else { - return resolve(other.toString()); - } - } - - @Override - public GcsPath resolve(String other) { - if (bucket.isEmpty() && object.isEmpty()) { - // Resolve on a root path is equivalent to looking up a bucket and object. - other = SCHEME + "://" + other; - } - - if (other.startsWith(SCHEME + "://")) { - GcsPath path = GcsPath.fromUri(other); - path.setFileSystem(getFileSystem()); - return path; - } - - if (other.isEmpty()) { - // An empty component MUST refer to a directory. - other = "/"; - } - - if (object.isEmpty()) { - return new GcsPath(fs, bucket, other); - } else if (object.endsWith("/")) { - return new GcsPath(fs, bucket, object + other); - } else { - return new GcsPath(fs, bucket, object + "/" + other); - } - } - - @Override - public Path resolveSibling(Path other) { - throw new UnsupportedOperationException(); - } - - @Override - public Path resolveSibling(String other) { - throw new UnsupportedOperationException(); - } - - @Override - public Path relativize(Path other) { - throw new UnsupportedOperationException(); - } - - @Override - public GcsPath toAbsolutePath() { - return this; - } - - @Override - public GcsPath toRealPath(LinkOption... options) throws IOException { - return this; - } - - @Override - public File toFile() { - throw new UnsupportedOperationException(); - } - - @Override - public WatchKey register(WatchService watcher, WatchEvent.Kind[] events, - WatchEvent.Modifier... modifiers) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public WatchKey register(WatchService watcher, WatchEvent.Kind... events) - throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator iterator() { - return new NameIterator(fs, !bucket.isEmpty(), bucketAndObject()); - } - - private static class NameIterator implements Iterator { - private final FileSystem fs; - private boolean fullPath; - private String name; - - NameIterator(FileSystem fs, boolean fullPath, String name) { - this.fs = fs; - this.fullPath = fullPath; - this.name = name; - } - - @Override - public boolean hasNext() { - return !Strings.isNullOrEmpty(name); - } - - @Override - public GcsPath next() { - int i = name.indexOf('/'); - String component; - if (i >= 0) { - component = name.substring(0, i); - name = name.substring(i + 1); - } else { - component = name; - name = null; - } - if (fullPath) { - fullPath = false; - return new GcsPath(fs, component, ""); - } else { - // Relative paths have no bucket. - return new GcsPath(fs, "", component); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - @Override - public int compareTo(Path other) { - if (!(other instanceof GcsPath)) { - throw new ClassCastException(); - } - - GcsPath path = (GcsPath) other; - int b = bucket.compareTo(path.bucket); - if (b != 0) { - return b; - } - - // Compare a component at a time, so that the separator char doesn't - // get compared against component contents. Eg, "a/b" < "a-1/b". - Iterator left = iterator(); - Iterator right = path.iterator(); - - while (left.hasNext() && right.hasNext()) { - String leftStr = left.next().toString(); - String rightStr = right.next().toString(); - int c = leftStr.compareTo(rightStr); - if (c != 0) { - return c; - } - } - - if (!left.hasNext() && !right.hasNext()) { - return 0; - } else { - return left.hasNext() ? 1 : -1; - } - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - GcsPath paths = (GcsPath) o; - return bucket.equals(paths.bucket) && object.equals(paths.object); - } - - @Override - public int hashCode() { - int result = bucket.hashCode(); - result = 31 * result + object.hashCode(); - return result; - } - - @Override - public String toString() { - if (!isAbsolute()) { - return object; - } - StringBuilder sb = new StringBuilder(); - sb.append(SCHEME) - .append("://"); - if (!bucket.isEmpty()) { - sb.append(bucket) - .append('/'); - } - sb.append(object); - return sb.toString(); - } - - // TODO: Consider using resource names for all GCS paths used by the SDK. - public String toResourceName() { - StringBuilder sb = new StringBuilder(); - sb.append("storage.googleapis.com/"); - if (!bucket.isEmpty()) { - sb.append(bucket).append('/'); - } - sb.append(object); - return sb.toString(); - } - - @Override - public URI toUri() { - try { - return new URI(SCHEME, "//" + bucketAndObject(), null); - } catch (URISyntaxException e) { - throw new RuntimeException("Unable to create URI for GCS path " + this); - } - } - - private String bucketAndObject() { - if (bucket.isEmpty()) { - return object; - } else { - return bucket + "/" + object; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java deleted file mode 100644 index 2f57938..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/gcsfs/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -/** Defines utilities used to interact with Google Cloud Storage. **/ -package com.google.cloud.dataflow.sdk.util.gcsfs; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java deleted file mode 100644 index c92adab..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -/** Defines utilities used by the Dataflow SDK. **/ -package com.google.cloud.dataflow.sdk.util; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java deleted file mode 100644 index 0d78b13..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/AccumulatorCombiningState.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; - -/** - * State for a single value that is managed by a {@link CombineFn}. This is an internal extension - * to {@link CombiningState} that includes the {@code AccumT} type. - * - * @param the type of values added to the state - * @param the type of accumulator - * @param the type of value extracted from the state - */ -public interface AccumulatorCombiningState - extends CombiningState { - - /** - * Read the merged accumulator for this combining value. It is implied that reading the - * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for - * this. - */ - AccumT getAccum(); - - /** - * Add an accumulator to this combining value. Depending on implementation this may immediately - * merge it with the previous accumulator, or may buffer this accumulator for a future merge. - */ - void addAccum(AccumT accum); - - /** - * Merge the given accumulators according to the underlying combiner. - */ - AccumT mergeAccumulators(Iterable accumulators); - - @Override - AccumulatorCombiningState readLater(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java deleted file mode 100644 index 363e480..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/BagState.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -/** - * State containing a bag values. Items can be added to the bag and the contents read out. - * - * @param The type of elements in the bag. - */ -public interface BagState extends CombiningState> { - @Override - BagState readLater(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java deleted file mode 100644 index 673bebb..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CombiningState.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; - -/** - * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single - * {@code OutputT} value. - * - * @param the type of values added to the state - * @param the type of value extracted from the state - */ -public interface CombiningState extends ReadableState, State { - /** - * Add a value to the buffer. - */ - void add(InputT value); - - /** - * Return true if this state is empty. - */ - ReadableState isEmpty(); - - @Override - CombiningState readLater(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java deleted file mode 100644 index 3683b74..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/CopyOnAccessInMemoryStateInternals.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; -import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; -import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.util.CombineFnUtil; -import com.google.cloud.dataflow.sdk.util.state.InMemoryStateInternals.InMemoryState; -import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder; -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; - -import org.joda.time.Instant; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; - -import javax.annotation.Nullable; - -/** - * {@link StateInternals} built on top of an underlying {@link StateTable} that contains instances - * of {@link InMemoryState}. Whenever state that exists in the underlying {@link StateTable} is - * accessed, an independent copy will be created within this table. - */ -public class CopyOnAccessInMemoryStateInternals implements StateInternals { - private final K key; - private final CopyOnAccessInMemoryStateTable table; - - /** - * Creates a new {@link CopyOnAccessInMemoryStateInternals} with the underlying (possibly null) - * StateInternals. - */ - public static CopyOnAccessInMemoryStateInternals withUnderlying( - K key, @Nullable CopyOnAccessInMemoryStateInternals underlying) { - return new CopyOnAccessInMemoryStateInternals(key, underlying); - } - - private CopyOnAccessInMemoryStateInternals( - K key, CopyOnAccessInMemoryStateInternals underlying) { - this.key = key; - table = - new CopyOnAccessInMemoryStateTable(key, underlying == null ? null : underlying.table); - } - - /** - * Ensures this {@link CopyOnAccessInMemoryStateInternals} is complete. Other copies of state for - * the same Step and Key may be discarded after invoking this method. - * - *

For each {@link StateNamespace}, for each {@link StateTag address} in that namespace that - * has not been bound in this {@link CopyOnAccessInMemoryStateInternals}, put a reference to that - * state within this {@link StateInternals}. - * - *

Additionally, stores the {@link WatermarkHoldState} with the earliest time bound in the - * state table after the commit is completed, enabling calls to - * {@link #getEarliestWatermarkHold()}. - * - * @return this table - */ - public CopyOnAccessInMemoryStateInternals commit() { - table.commit(); - return this; - } - - /** - * Gets the earliest Watermark Hold present in this table. - * - *

Must be called after this state has been committed. Will throw an - * {@link IllegalStateException} if the state has not been committed. - */ - public Instant getEarliestWatermarkHold() { - // After commit, the watermark hold is always present, but may be - // BoundedWindow#TIMESTAMP_MAX_VALUE if there is no hold set. - checkState( - table.earliestWatermarkHold.isPresent(), - "Can't get the earliest watermark hold in a %s before it is committed", - getClass().getSimpleName()); - return table.earliestWatermarkHold.get(); - } - - @Override - public T state(StateNamespace namespace, StateTag address) { - return state(namespace, address, StateContexts.nullContext()); - } - - @Override - public T state( - StateNamespace namespace, StateTag address, StateContext c) { - return table.get(namespace, address, c); - } - - @Override - public K getKey() { - return key; - } - - public boolean isEmpty() { - return Iterables.isEmpty(table.values()); - } - - /** - * A {@link StateTable} that, when a value is retrieved with - * {@link StateTable#get(StateNamespace, StateTag)}, first attempts to obtain a copy of existing - * {@link State} from an underlying {@link StateTable}. - */ - private static class CopyOnAccessInMemoryStateTable extends StateTable { - private final K key; - private Optional> underlying; - - /** - * The StateBinderFactory currently in use by this {@link CopyOnAccessInMemoryStateTable}. - * - *

There are three {@link StateBinderFactory} implementations used by the {@link - * CopyOnAccessInMemoryStateTable}. - *

    - *
  • The default {@link StateBinderFactory} is a {@link CopyOnBindBinderFactory}, allowing - * the table to copy any existing {@link State} values to this {@link StateTable} from the - * underlying table when accessed, at which point mutations will not be visible to the - * underlying table - effectively a "Copy by Value" binder.
  • - *
  • During the execution of the {@link #commit()} method, this is a - * {@link ReadThroughBinderFactory}, which copies the references to the existing - * {@link State} objects to this {@link StateTable}.
  • - *
  • After the execution of the {@link #commit()} method, this is an - * instance of {@link InMemoryStateBinderFactory}, which constructs new instances of state - * when a {@link StateTag} is bound.
  • - *
- */ - private StateBinderFactory binderFactory; - - /** - * The earliest watermark hold in this table. - */ - private Optional earliestWatermarkHold; - - public CopyOnAccessInMemoryStateTable(K key, StateTable underlying) { - this.key = key; - this.underlying = Optional.fromNullable(underlying); - binderFactory = new CopyOnBindBinderFactory<>(key, this.underlying); - earliestWatermarkHold = Optional.absent(); - } - - /** - * Copies all values in the underlying table to this table, then discards the underlying table. - * - *

If there is an underlying table, this replaces the existing - * {@link CopyOnBindBinderFactory} with a {@link ReadThroughBinderFactory}, then reads all of - * the values in the existing table, binding the state values to this table. The old StateTable - * should be discarded after the call to {@link #commit()}. - * - *

After copying all of the existing values, replace the binder factory with an instance of - * {@link InMemoryStateBinderFactory} to construct new values, since all existing values - * are bound in this {@link StateTable table} and this table represents the canonical state. - */ - private void commit() { - Instant earliestHold = getEarliestWatermarkHold(); - if (underlying.isPresent()) { - ReadThroughBinderFactory readThroughBinder = - new ReadThroughBinderFactory<>(underlying.get()); - binderFactory = readThroughBinder; - Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this); - if (earliestUnderlyingHold.isBefore(earliestHold)) { - earliestHold = earliestUnderlyingHold; - } - } - earliestWatermarkHold = Optional.of(earliestHold); - clearEmpty(); - binderFactory = new InMemoryStateBinderFactory<>(key); - underlying = Optional.absent(); - } - - /** - * Get the earliest watermark hold in this table. Ignores the contents of any underlying table. - */ - private Instant getEarliestWatermarkHold() { - Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (State existingState : this.values()) { - if (existingState instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) existingState).read(); - if (hold != null && hold.isBefore(earliest)) { - earliest = hold; - } - } - } - return earliest; - } - - /** - * Clear all empty {@link StateNamespace StateNamespaces} from this table. If all states are - * empty, clear the entire table. - * - *

Because {@link InMemoryState} is not removed from the {@link StateTable} after it is - * cleared, in case contents are modified after being cleared, the table must be explicitly - * checked to ensure that it contains state and removed if not (otherwise we may never use - * the table again). - */ - private void clearEmpty() { - Collection emptyNamespaces = new HashSet<>(this.getNamespacesInUse()); - for (StateNamespace namespace : this.getNamespacesInUse()) { - for (State existingState : this.getTagsInUse(namespace).values()) { - if (!((InMemoryState) existingState).isCleared()) { - emptyNamespaces.remove(namespace); - break; - } - } - } - for (StateNamespace empty : emptyNamespaces) { - this.clearNamespace(empty); - } - } - - @Override - protected StateBinder binderForNamespace(final StateNamespace namespace, StateContext c) { - return binderFactory.forNamespace(namespace, c); - } - - private static interface StateBinderFactory { - StateBinder forNamespace(StateNamespace namespace, StateContext c); - } - - /** - * {@link StateBinderFactory} that creates a copy of any existing state when the state is bound. - */ - private static class CopyOnBindBinderFactory implements StateBinderFactory { - private final K key; - private final Optional> underlying; - - public CopyOnBindBinderFactory(K key, Optional> underlying) { - this.key = key; - this.underlying = underlying; - } - - private boolean containedInUnderlying(StateNamespace namespace, StateTag tag) { - return underlying.isPresent() && underlying.get().isNamespaceInUse(namespace) - && underlying.get().getTagsInUse(namespace).containsKey(tag); - } - - @Override - public StateBinder forNamespace(final StateNamespace namespace, final StateContext c) { - return new StateBinder() { - @Override - public WatermarkHoldState bindWatermark( - StateTag> address, - OutputTimeFn outputTimeFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryWatermarkHold<>( - outputTimeFn); - } - } - - @Override - public ValueState bindValue( - StateTag> address, Coder coder) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryValue<>(); - } - } - - @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> - existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryCombiningValue<>( - key, combineFn.asKeyedFn()); - } - } - - @Override - public BagState bindBag( - StateTag> address, Coder elemCoder) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> existingState = - (InMemoryStateInternals.InMemoryState>) - underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryBag<>(); - } - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, - Coder accumCoder, - KeyedCombineFn combineFn) { - if (containedInUnderlying(namespace, address)) { - @SuppressWarnings("unchecked") - InMemoryState> - existingState = ( - InMemoryStateInternals - .InMemoryState>) underlying.get().get(namespace, address, c); - return existingState.copy(); - } else { - return new InMemoryStateInternals.InMemoryCombiningValue<>(key, combineFn); - } - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, - Coder accumCoder, - KeyedCombineFnWithContext combineFn) { - return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - }; - } - } - - /** - * {@link StateBinderFactory} that reads directly from the underlying table. Used during calls - * to {@link CopyOnAccessInMemoryStateTable#commit()} to read all values from - * the underlying table. - */ - private static class ReadThroughBinderFactory implements StateBinderFactory { - private final StateTable underlying; - - public ReadThroughBinderFactory(StateTable underlying) { - this.underlying = underlying; - } - - public Instant readThroughAndGetEarliestHold(StateTable readTo) { - Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE; - for (StateNamespace namespace : underlying.getNamespacesInUse()) { - for (Map.Entry, ? extends State> existingState : - underlying.getTagsInUse(namespace).entrySet()) { - if (!((InMemoryState) existingState.getValue()).isCleared()) { - // Only read through non-cleared values to ensure that completed windows are - // eventually discarded, and remember the earliest watermark hold from among those - // values. - State state = - readTo.get(namespace, existingState.getKey(), StateContexts.nullContext()); - if (state instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) state).read(); - if (hold != null && hold.isBefore(earliestHold)) { - earliestHold = hold; - } - } - } - } - } - return earliestHold; - } - - @Override - public StateBinder forNamespace(final StateNamespace namespace, final StateContext c) { - return new StateBinder() { - @Override - public WatermarkHoldState bindWatermark( - StateTag> address, - OutputTimeFn outputTimeFn) { - return underlying.get(namespace, address, c); - } - - @Override - public ValueState bindValue( - StateTag> address, Coder coder) { - return underlying.get(namespace, address, c); - } - - @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, CombineFn combineFn) { - return underlying.get(namespace, address, c); - } - - @Override - public BagState bindBag( - StateTag> address, Coder elemCoder) { - return underlying.get(namespace, address, c); - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, - Coder accumCoder, - KeyedCombineFn combineFn) { - return underlying.get(namespace, address, c); - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, - Coder accumCoder, - KeyedCombineFnWithContext combineFn) { - return bindKeyedCombiningValue( - address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - }; - } - } - - private static class InMemoryStateBinderFactory implements StateBinderFactory { - private final K key; - - public InMemoryStateBinderFactory(K key) { - this.key = key; - } - - @Override - public StateBinder forNamespace(StateNamespace namespace, StateContext c) { - return new InMemoryStateInternals.InMemoryStateBinder<>(key, c); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java deleted file mode 100644 index 8404801..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/InMemoryStateInternals.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; -import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; -import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.util.CombineFnUtil; -import com.google.cloud.dataflow.sdk.util.state.StateTag.StateBinder; - -import org.joda.time.Instant; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -import javax.annotation.Nullable; - -/** - * In-memory implementation of {@link StateInternals}. Used in {@code BatchModeExecutionContext} - * and for running tests that need state. - */ -@Experimental(Kind.STATE) -public class InMemoryStateInternals implements StateInternals { - - public static InMemoryStateInternals forKey(K key) { - return new InMemoryStateInternals<>(key); - } - - private final K key; - - protected InMemoryStateInternals(K key) { - this.key = key; - } - - @Override - public K getKey() { - return key; - } - - interface InMemoryState> { - boolean isCleared(); - T copy(); - } - - protected final StateTable inMemoryState = new StateTable() { - @Override - protected StateBinder binderForNamespace(StateNamespace namespace, StateContext c) { - return new InMemoryStateBinder(key, c); - } - }; - - public void clear() { - inMemoryState.clear(); - } - - /** - * Return true if the given state is empty. This is used by the test framework to make sure - * that the state has been properly cleaned up. - */ - protected boolean isEmptyForTesting(State state) { - return ((InMemoryState) state).isCleared(); - } - - @Override - public T state(StateNamespace namespace, StateTag address) { - return inMemoryState.get(namespace, address, StateContexts.nullContext()); - } - - @Override - public T state( - StateNamespace namespace, StateTag address, final StateContext c) { - return inMemoryState.get(namespace, address, c); - } - - /** - * A {@link StateBinder} that returns In Memory {@link State} objects. - */ - static class InMemoryStateBinder implements StateBinder { - private final K key; - private final StateContext c; - - InMemoryStateBinder(K key, StateContext c) { - this.key = key; - this.c = c; - } - - @Override - public ValueState bindValue( - StateTag> address, Coder coder) { - return new InMemoryValue(); - } - - @Override - public BagState bindBag( - final StateTag> address, Coder elemCoder) { - return new InMemoryBag(); - } - - @Override - public AccumulatorCombiningState - bindCombiningValue( - StateTag> address, - Coder accumCoder, - final CombineFn combineFn) { - return new InMemoryCombiningValue(key, combineFn.asKeyedFn()); - } - - @Override - public WatermarkHoldState bindWatermark( - StateTag> address, - OutputTimeFn outputTimeFn) { - return new InMemoryWatermarkHold(outputTimeFn); - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValue( - StateTag> address, - Coder accumCoder, - KeyedCombineFn combineFn) { - return new InMemoryCombiningValue(key, combineFn); - } - - @Override - public AccumulatorCombiningState - bindKeyedCombiningValueWithContext( - StateTag> address, - Coder accumCoder, - KeyedCombineFnWithContext combineFn) { - return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - } - - static final class InMemoryValue implements ValueState, InMemoryState> { - private boolean isCleared = true; - private T value = null; - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this Value. - value = null; - isCleared = true; - } - - @Override - public InMemoryValue readLater() { - return this; - } - - @Override - public T read() { - return value; - } - - @Override - public void write(T input) { - isCleared = false; - this.value = input; - } - - @Override - public InMemoryValue copy() { - InMemoryValue that = new InMemoryValue<>(); - if (!this.isCleared) { - that.isCleared = this.isCleared; - that.value = this.value; - } - return that; - } - - @Override - public boolean isCleared() { - return isCleared; - } - } - - static final class InMemoryWatermarkHold - implements WatermarkHoldState, InMemoryState> { - - private final OutputTimeFn outputTimeFn; - - @Nullable - private Instant combinedHold = null; - - public InMemoryWatermarkHold(OutputTimeFn outputTimeFn) { - this.outputTimeFn = outputTimeFn; - } - - @Override - public InMemoryWatermarkHold readLater() { - return this; - } - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this WatermarkBagInternal. - combinedHold = null; - } - - @Override - public Instant read() { - return combinedHold; - } - - @Override - public void add(Instant outputTime) { - combinedHold = combinedHold == null ? outputTime - : outputTimeFn.combine(combinedHold, outputTime); - } - - @Override - public boolean isCleared() { - return combinedHold == null; - } - - @Override - public ReadableState isEmpty() { - return new ReadableState() { - @Override - public ReadableState readLater() { - return this; - } - @Override - public Boolean read() { - return combinedHold == null; - } - }; - } - - @Override - public OutputTimeFn getOutputTimeFn() { - return outputTimeFn; - } - - @Override - public String toString() { - return Objects.toString(combinedHold); - } - - @Override - public InMemoryWatermarkHold copy() { - InMemoryWatermarkHold that = - new InMemoryWatermarkHold<>(outputTimeFn); - that.combinedHold = this.combinedHold; - return that; - } - } - - static final class InMemoryCombiningValue - implements AccumulatorCombiningState, - InMemoryState> { - private final K key; - private boolean isCleared = true; - private final KeyedCombineFn combineFn; - private AccumT accum; - - InMemoryCombiningValue( - K key, KeyedCombineFn combineFn) { - this.key = key; - this.combineFn = combineFn; - accum = combineFn.createAccumulator(key); - } - - @Override - public InMemoryCombiningValue readLater() { - return this; - } - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this CombiningValue. - accum = combineFn.createAccumulator(key); - isCleared = true; - } - - @Override - public OutputT read() { - return combineFn.extractOutput(key, accum); - } - - @Override - public void add(InputT input) { - isCleared = false; - accum = combineFn.addInput(key, accum, input); - } - - @Override - public AccumT getAccum() { - return accum; - } - - @Override - public ReadableState isEmpty() { - return new ReadableState() { - @Override - public ReadableState readLater() { - return this; - } - @Override - public Boolean read() { - return isCleared; - } - }; - } - - @Override - public void addAccum(AccumT accum) { - isCleared = false; - this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum)); - } - - @Override - public AccumT mergeAccumulators(Iterable accumulators) { - return combineFn.mergeAccumulators(key, accumulators); - } - - @Override - public boolean isCleared() { - return isCleared; - } - - @Override - public InMemoryCombiningValue copy() { - InMemoryCombiningValue that = - new InMemoryCombiningValue<>(key, combineFn); - if (!this.isCleared) { - that.isCleared = this.isCleared; - that.addAccum(accum); - } - return that; - } - } - - static final class InMemoryBag implements BagState, InMemoryState> { - private List contents = new ArrayList<>(); - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this Bag. - // The result of get/read below must be stable for the lifetime of the bundle within which it - // was generated. In batch and direct runners the bundle lifetime can be - // greater than the window lifetime, in which case this method can be called while - // the result is still in use. We protect against this by hot-swapping instead of - // clearing the contents. - contents = new ArrayList<>(); - } - - @Override - public InMemoryBag readLater() { - return this; - } - - @Override - public Iterable read() { - return contents; - } - - @Override - public void add(T input) { - contents.add(input); - } - - @Override - public boolean isCleared() { - return contents.isEmpty(); - } - - @Override - public ReadableState isEmpty() { - return new ReadableState() { - @Override - public ReadableState readLater() { - return this; - } - - @Override - public Boolean read() { - return contents.isEmpty(); - } - }; - } - - @Override - public InMemoryBag copy() { - InMemoryBag that = new InMemoryBag<>(); - that.contents.addAll(this.contents); - return that; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java deleted file mode 100644 index 40211d7..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/MergingStateAccessor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; - -import java.util.Map; - -/** - * Interface for accessing persistent state while windows are merging. - * - *

For internal use only. - */ -@Experimental(Kind.STATE) -public interface MergingStateAccessor - extends StateAccessor { - /** - * Analogous to {@link #access}, but returned as a map from each window which is - * about to be merged to the corresponding state. Only includes windows which - * are known to have state. - */ - Map accessInEachMergingWindow( - StateTag address); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java deleted file mode 100644 index 8f690a3..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/ReadableState.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind; - -/** - * A {@code StateContents} is produced by the read methods on all {@link State} objects. - * Calling {@link #read} returns the associated value. - * - *

This class is similar to {@link java.util.concurrent.Future}, but each invocation of - * {@link #read} need not return the same value. - * - *

Getting the {@code StateContents} from a read method indicates the desire to eventually - * read a value. Depending on the runner this may or may not immediately start the read. - * - * @param The type of value returned by {@link #read}. - */ -@Experimental(Kind.STATE) -public interface ReadableState { - /** - * Read the current value, blocking until it is available. - * - *

If there will be many calls to {@link #read} for different state in short succession, - * you should first call {@link #readLater} for all of them so the reads can potentially be - * batched (depending on the underlying {@link StateInternals} implementation}. - */ - T read(); - - /** - * Indicate that the value will be read later. - * - *

This allows a {@link StateInternals} implementation to start an asynchronous prefetch or - * to include this state in the next batch of reads. - * - * @return this for convenient chaining - */ - ReadableState readLater(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java deleted file mode 100644 index 0cef786..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/State.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -/** - * Base interface for all state locations. - * - *

Specific types of state add appropriate accessors for reading and writing values, see - * {@link ValueState}, {@link BagState}, and {@link CombiningState}. - */ -public interface State { - - /** - * Clear out the state location. - */ - void clear(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java deleted file mode 100644 index 6cfbecf..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateAccessor.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.annotations.Experimental; -import com.google.cloud.dataflow.sdk.annotations.Experimental.Kind; - -/** - * Interface for accessing a {@link StateTag} in the current context. - * - *

For internal use only. - */ -@Experimental(Kind.STATE) -public interface StateAccessor { - /** - * Access the storage for the given {@code address} in the current window. - * - *

Never accounts for merged windows. When windows are merged, any state accessed via - * this method must be eagerly combined and written into the result window. - */ - StateT access(StateTag address); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java deleted file mode 100644 index 96387d8..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/state/StateContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2016 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.util.state; - -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.values.PCollectionView; - -/** - * Information accessible the state API. - */ -public interface StateContext { - /** - * Returns the {@code PipelineOptions} specified with the - * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner}. - */ - public abstract PipelineOptions getPipelineOptions(); - - /** - * Returns the value of the side input for the corresponding state window. - */ - public abstract T sideInput(PCollectionView view); - - /** - * Returns the window corresponding to the state. - */ - public abstract W window(); -}