beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [11/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:47:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
deleted file mode 100644
index c92e7e9..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * A class representing a range of {@link ByteKey ByteKeys}.
- *
- * <p>Instances of {@link ByteKeyRange} are immutable.
- *
- * <p>A {@link ByteKeyRange} enforces the restriction that its start and end keys must form a valid,
- * non-empty range {@code [startKey, endKey)} that is inclusive of the start key and exclusive of
- * the end key.
- *
- * <p>When the end key is empty, it is treated as the largest possible key.
- *
- * <h3>Interpreting {@link ByteKey} in a {@link ByteKeyRange}</h3>
- *
- * <p>The primary role of {@link ByteKeyRange} is to provide functionality for
- * {@link #estimateFractionForKey(ByteKey)}, {@link #interpolateKey(double)}, and
- * {@link #split(int)}, which are used for Google Cloud Dataflow's
- * <a href="https://cloud.google.com/dataflow/service/dataflow-service-desc#AutoScaling">Autoscaling
- * and Dynamic Work Rebalancing</a> features.
- *
- * <p>{@link ByteKeyRange} implements these features by treating a {@link ByteKey}'s underlying
- * {@code byte[]} as the binary expansion of floating point numbers in the range {@code [0.0, 1.0]}.
- * For example, the keys {@code ByteKey.of(0x80)}, {@code ByteKey.of(0xc0)}, and
- * {@code ByteKey.of(0xe0)} are interpreted as {@code 0.5}, {@code 0.75}, and {@code 0.875}
- * respectively. The empty {@code ByteKey.EMPTY} is interpreted as {@code 0.0} when used as the
- * start of a range and {@code 1.0} when used as the end key.
- *
- * <p>Key interpolation, fraction estimation, and range splitting are all interpreted in these
- * floating-point semantics. See the respective implementations for further details. <b>Note:</b>
- * the underlying implementations of these functions use {@link BigInteger} and {@link BigDecimal},
- * so they can be slow and should not be called in hot loops. Dataflow's dynamic work
- * rebalancing will only invoke these functions during periodic control operations, so they are not
- * called on the critical path.
- *
- * @see ByteKey
- */
-public final class ByteKeyRange implements Serializable {
-  private static final Logger logger = LoggerFactory.getLogger(ByteKeyRange.class);
-
-  /** The range of all keys, with empty start and end keys. */
-  public static final ByteKeyRange ALL_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.EMPTY);
-
-  /**
-   * Creates a new {@link ByteKeyRange} with the given start and end keys.
-   *
-   * <p>Note that if {@code endKey} is empty, it is treated as the largest possible key.
-   *
-   * @see ByteKeyRange
-   *
-   * @throws IllegalArgumentException if {@code endKey} is less than or equal to {@code startKey},
-   *     unless {@code endKey} is empty indicating the maximum possible {@link ByteKey}.
-   */
-  public static ByteKeyRange of(ByteKey startKey, ByteKey endKey) {
-    return new ByteKeyRange(startKey, endKey);
-  }
-
-  /**
-   * Returns the {@link ByteKey} representing the lower bound of this {@link ByteKeyRange}.
-   */
-  public ByteKey getStartKey() {
-    return startKey;
-  }
-
-  /**
-   * Returns the {@link ByteKey} representing the upper bound of this {@link ByteKeyRange}.
-   *
-   * <p>Note that if {@code endKey} is empty, it is treated as the largest possible key.
-   */
-  public ByteKey getEndKey() {
-    return endKey;
-  }
-
-  /**
-   * Returns {@code true} if the specified {@link ByteKey} is contained within this range.
-   */
-  public Boolean containsKey(ByteKey key) {
-    return key.compareTo(startKey) >= 0 && endsAfterKey(key);
-  }
-
-  /**
-   * Returns {@code true} if the specified {@link ByteKeyRange} overlaps this range.
-   */
-  public Boolean overlaps(ByteKeyRange other) {
-    // If each range starts before the other range ends, then they must overlap.
-    //     { [] } -- one range inside the other   OR   { [ } ] -- partial overlap.
-    return endsAfterKey(other.startKey) && other.endsAfterKey(startKey);
-  }
-
-  /**
-   * Returns a list of up to {@code numSplits + 1} {@link ByteKey ByteKeys} in ascending order,
-   * where the keys have been interpolated to form roughly equal sub-ranges of this
-   * {@link ByteKeyRange}, assuming a uniform distribution of keys within this range.
-   *
-   * <p>The first {@link ByteKey} in the result is guaranteed to be equal to {@link #getStartKey},
-   * and the last {@link ByteKey} in the result is guaranteed to be equal to {@link #getEndKey}.
-   * Thus the resulting list exactly spans the same key range as this {@link ByteKeyRange}.
-   *
-   * <p>Note that the number of keys returned is not always equal to {@code numSplits + 1}.
-   * Specifically, if this range is unsplittable (e.g., because the start and end keys are equal
-   * up to padding by zero bytes), the list returned will only contain the start and end key.
-   *
-   * @throws IllegalArgumentException if the specified number of splits is < 1
-   * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about split semantics.
-   */
-  public List<ByteKey> split(int numSplits) {
-    checkArgument(numSplits > 0, "numSplits %s must be a positive integer", numSplits);
-
-    try {
-      ImmutableList.Builder<ByteKey> ret = ImmutableList.builder();
-      ret.add(startKey);
-      for (int i = 1; i < numSplits; ++i) {
-        ret.add(interpolateKey(i / (double) numSplits));
-      }
-      ret.add(endKey);
-      return ret.build();
-    } catch (IllegalStateException e) {
-      // The range is not splittable -- just return
-      return ImmutableList.of(startKey, endKey);
-    }
-  }
-
-  /**
-   * Returns the fraction of this range {@code [startKey, endKey)} that is in the interval
-   * {@code [startKey, key)}.
-   *
-   * @throws IllegalArgumentException if {@code key} does not fall within this range
-   * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics.
-   */
-  public double estimateFractionForKey(ByteKey key) {
-    checkNotNull(key, "key");
-    checkArgument(!key.isEmpty(), "Cannot compute fraction for an empty key");
-    checkArgument(
-        key.compareTo(startKey) >= 0, "Expected key %s >= range start key %s", key, startKey);
-
-    if (key.equals(endKey)) {
-      return 1.0;
-    }
-    checkArgument(containsKey(key), "Cannot compute fraction for %s outside this %s", key, this);
-
-    byte[] startBytes = startKey.getBytes();
-    byte[] endBytes = endKey.getBytes();
-    byte[] keyBytes = key.getBytes();
-    // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other
-    // keys, to get a concrete least upper bound for the desired range.
-    if (endKey.isEmpty()) {
-      startBytes = addHeadByte(startBytes, (byte) 0);
-      endBytes = addHeadByte(endBytes, (byte) 1);
-      keyBytes = addHeadByte(keyBytes, (byte) 0);
-    }
-
-    // Pad to the longest of all 3 keys.
-    int paddedKeyLength = Math.max(Math.max(startBytes.length, endBytes.length), keyBytes.length);
-    BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength);
-    BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength);
-    BigInteger keyInt = paddedPositiveInt(keyBytes, paddedKeyLength);
-
-    // Keys are equal subject to padding by 0.
-    BigInteger range = rangeEndInt.subtract(rangeStartInt);
-    if (range.equals(BigInteger.ZERO)) {
-      logger.warn(
-          "Using 0.0 as the default fraction for this near-empty range {} where start and end keys"
-              + " differ only by trailing zeros.",
-          this);
-      return 0.0;
-    }
-
-    // Compute the progress (key-start)/(end-start) scaling by 2^64, dividing (which rounds),
-    // and then scaling down after the division. This gives ample precision when converted to
-    // double.
-    BigInteger progressScaled = keyInt.subtract(rangeStartInt).shiftLeft(64);
-    return progressScaled.divide(range).doubleValue() / Math.pow(2, 64);
-  }
-
-  /**
-   * Returns a {@link ByteKey} {@code key} such that {@code [startKey, key)} represents
-   * approximately the specified fraction of the range {@code [startKey, endKey)}. The interpolation
-   * is computed assuming a uniform distribution of keys.
-   *
-   * <p>For example, given the largest possible range (defined by empty start and end keys), the
-   * fraction {@code 0.5} will return the {@code ByteKey.of(0x80)}, which will also be returned for
-   * ranges {@code [0x40, 0xc0)} and {@code [0x6f, 0x91)}.
-   *
-   * <p>The key returned will never be empty.
-   *
-   * @throws IllegalArgumentException if {@code fraction} is outside the range [0, 1)
-   * @throws IllegalStateException if this range cannot be interpolated
-   * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics.
-   */
-  public ByteKey interpolateKey(double fraction) {
-    checkArgument(
-        fraction >= 0.0 && fraction < 1.0, "Fraction %s must be in the range [0, 1)", fraction);
-    byte[] startBytes = startKey.getBytes();
-    byte[] endBytes = endKey.getBytes();
-    // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other
-    // keys, to get a concrete least upper bound for the desired range.
-    if (endKey.isEmpty()) {
-      startBytes = addHeadByte(startBytes, (byte) 0);
-      endBytes = addHeadByte(endBytes, (byte) 1);
-    }
-
-    // Pad to the longest key.
-    int paddedKeyLength = Math.max(startBytes.length, endBytes.length);
-    BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength);
-    BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength);
-
-    // If the keys are equal subject to padding by 0, we can't interpolate.
-    BigInteger range = rangeEndInt.subtract(rangeStartInt);
-    checkState(
-        !range.equals(BigInteger.ZERO),
-        "Refusing to interpolate for near-empty %s where start and end keys differ only by trailing"
-            + " zero bytes.",
-        this);
-
-    // Add precision so that range is at least 53 (double mantissa length) bits long. This way, we
-    // can interpolate small ranges finely, e.g., split the range key 3 to key 4 into 1024 parts.
-    // We add precision to range by adding zero bytes to the end of the keys, aka shifting the
-    // underlying BigInteger left by a multiple of 8 bits.
-    int bytesNeeded = ((53 - range.bitLength()) + 7) / 8;
-    if (bytesNeeded > 0) {
-      range = range.shiftLeft(bytesNeeded * 8);
-      rangeStartInt = rangeStartInt.shiftLeft(bytesNeeded * 8);
-      paddedKeyLength += bytesNeeded;
-    }
-
-    BigInteger interpolatedOffset =
-        new BigDecimal(range).multiply(BigDecimal.valueOf(fraction)).toBigInteger();
-
-    int outputKeyLength = endKey.isEmpty() ? (paddedKeyLength - 1) : paddedKeyLength;
-    return ByteKey.copyFrom(
-        fixupHeadZeros(rangeStartInt.add(interpolatedOffset).toByteArray(), outputKeyLength));
-  }
-
-  /**
-   * Returns new {@link ByteKeyRange} like this one, but with the specified start key.
-   */
-  public ByteKeyRange withStartKey(ByteKey startKey) {
-    return new ByteKeyRange(startKey, endKey);
-  }
-
-  /**
-   * Returns new {@link ByteKeyRange} like this one, but with the specified end key.
-   */
-  public ByteKeyRange withEndKey(ByteKey endKey) {
-    return new ByteKeyRange(startKey, endKey);
-  }
-
-  ////////////////////////////////////////////////////////////////////////////////////
-  private final ByteKey startKey;
-  private final ByteKey endKey;
-
-  private ByteKeyRange(ByteKey startKey, ByteKey endKey) {
-    this.startKey = checkNotNull(startKey, "startKey");
-    this.endKey = checkNotNull(endKey, "endKey");
-    checkArgument(endsAfterKey(startKey), "Start %s must be less than end %s", startKey, endKey);
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(ByteKeyRange.class)
-        .add("startKey", startKey)
-        .add("endKey", endKey)
-        .toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o == this) {
-      return true;
-    }
-    if (!(o instanceof ByteKeyRange)) {
-      return false;
-    }
-    ByteKeyRange other = (ByteKeyRange) o;
-    return Objects.equals(startKey, other.startKey) && Objects.equals(endKey, other.endKey);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(startKey, endKey);
-  }
-
-  /**
-   * Returns a copy of the specified array with the specified byte added at the front.
-   */
-  private static byte[] addHeadByte(byte[] array, byte b) {
-    byte[] ret = new byte[array.length + 1];
-    ret[0] = b;
-    System.arraycopy(array, 0, ret, 1, array.length);
-    return ret;
-  }
-
-  /**
-   * Ensures the array is exactly {@code size} bytes long. Returns the input array if the condition
-   * is met, otherwise either adds or removes zero bytes from the beginning of {@code array}.
-   */
-  private static byte[] fixupHeadZeros(byte[] array, int size) {
-    int padding = size - array.length;
-    if (padding == 0) {
-      return array;
-    }
-
-    if (padding < 0) {
-      // There is one zero byte at the beginning, added by BigInteger to make there be a sign
-      // bit when converting to bytes.
-      verify(
-          padding == -1,
-          "key %s: expected length %d with exactly one byte of padding, found %d",
-          ByteKey.copyFrom(array),
-          size,
-          -padding);
-      verify(
-          (array[0] == 0) && ((array[1] & 0x80) == 0x80),
-          "key %s: is 1 byte longer than expected, indicating BigInteger padding. Expect first byte"
-              + " to be zero with set MSB in second byte.",
-          ByteKey.copyFrom(array));
-      return Arrays.copyOfRange(array, 1, array.length);
-    }
-
-    byte[] ret = new byte[size];
-    System.arraycopy(array, 0, ret, padding, array.length);
-    return ret;
-  }
-
-  /**
-   * Returns {@code true} when the specified {@code key} is smaller this range's end key. The only
-   * semantic change from {@code (key.compareTo(getEndKey()) < 0)} is that the empty end key is
-   * treated as larger than all possible {@link ByteKey keys}.
-   */
-  boolean endsAfterKey(ByteKey key) {
-    return endKey.isEmpty() || key.compareTo(endKey) < 0;
-  }
-
-  /** Builds a BigInteger out of the specified array, padded to the desired byte length. */
-  private static BigInteger paddedPositiveInt(byte[] bytes, int length) {
-    int bytePaddingNeeded = length - bytes.length;
-    checkArgument(
-        bytePaddingNeeded >= 0, "Required bytes.length {} < length {}", bytes.length, length);
-    BigInteger ret = new BigInteger(1, bytes);
-    return (bytePaddingNeeded == 0) ? ret : ret.shiftLeft(8 * bytePaddingNeeded);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
deleted file mode 100644
index 0d01f8f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.range;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link RangeTracker} for {@link ByteKey ByteKeys} in {@link ByteKeyRange ByteKeyRanges}.
- *
- * @see ByteKey
- * @see ByteKeyRange
- */
-public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
-  private static final Logger logger = LoggerFactory.getLogger(ByteKeyRangeTracker.class);
-
-  /** Instantiates a new {@link ByteKeyRangeTracker} with the specified range. */
-  public static ByteKeyRangeTracker of(ByteKeyRange range) {
-    return new ByteKeyRangeTracker(range);
-  }
-
-  @Override
-  public synchronized ByteKey getStartPosition() {
-    return range.getStartKey();
-  }
-
-  @Override
-  public synchronized ByteKey getStopPosition() {
-    return range.getEndKey();
-  }
-
-  @Override
-  public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
-    if (isAtSplitPoint && !range.containsKey(recordStart)) {
-      return false;
-    }
-    position = recordStart;
-    return true;
-  }
-
-  @Override
-  public synchronized boolean trySplitAtPosition(ByteKey splitPosition) {
-    // Unstarted.
-    if (position == null) {
-      logger.warn(
-          "{}: Rejecting split request at {} because no records have been returned.",
-          this,
-          splitPosition);
-      return false;
-    }
-
-    // Started, but not after current position.
-    if (splitPosition.compareTo(position) <= 0) {
-      logger.warn(
-          "{}: Rejecting split request at {} because it is not after current position {}.",
-          this,
-          splitPosition,
-          position);
-      return false;
-    }
-
-    // Sanity check.
-    if (!range.containsKey(splitPosition)) {
-      logger.warn(
-          "{}: Rejecting split request at {} because it is not within the range.",
-          this,
-          splitPosition);
-      return false;
-    }
-
-    range = range.withEndKey(splitPosition);
-    return true;
-  }
-
-  @Override
-  public synchronized double getFractionConsumed() {
-    if (position == null) {
-      return 0;
-    }
-    return range.estimateFractionForKey(position);
-  }
-
-  ///////////////////////////////////////////////////////////////////////////////
-  private ByteKeyRange range;
-  @Nullable private ByteKey position;
-
-  private ByteKeyRangeTracker(ByteKeyRange range) {
-    this.range = range;
-    this.position = null;
-  }
-
-  @Override
-  public String toString() {
-    return toStringHelper(ByteKeyRangeTracker.class)
-        .add("range", range)
-        .add("position", position)
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
deleted file mode 100644
index 3cebb77..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.range;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link RangeTracker} for non-negative positions of type {@code long}.
- */
-public class OffsetRangeTracker implements RangeTracker<Long> {
-  private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
-
-  private final long startOffset;
-  private long stopOffset;
-  private long lastRecordStart = -1L;
-  private long offsetOfLastSplitPoint = -1L;
-
-  /**
-   * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and
-   * indicates reading all of the records until the end without specifying exactly what the end is.
-   *
-   * <p>Infinite ranges cannot be split because it is impossible to estimate progress within them.
-   */
-  public static final long OFFSET_INFINITY = Long.MAX_VALUE;
-
-  /**
-   * Creates an {@code OffsetRangeTracker} for the specified range.
-   */
-  public OffsetRangeTracker(long startOffset, long stopOffset) {
-    this.startOffset = startOffset;
-    this.stopOffset = stopOffset;
-  }
-
-  @Override
-  public synchronized Long getStartPosition() {
-    return startOffset;
-  }
-
-  @Override
-  public synchronized Long getStopPosition() {
-    return stopOffset;
-  }
-
-  @Override
-  public boolean tryReturnRecordAt(boolean isAtSplitPoint, Long recordStart) {
-    return tryReturnRecordAt(isAtSplitPoint, recordStart.longValue());
-  }
-
-  public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) {
-    if (lastRecordStart == -1 && !isAtSplitPoint) {
-      throw new IllegalStateException(
-          String.format("The first record [starting at %d] must be at a split point", recordStart));
-    }
-    if (recordStart < lastRecordStart) {
-      throw new IllegalStateException(
-          String.format(
-              "Trying to return record [starting at %d] "
-                  + "which is before the last-returned record [starting at %d]",
-              recordStart,
-              lastRecordStart));
-    }
-    if (isAtSplitPoint) {
-      if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) {
-        throw new IllegalStateException(
-            String.format(
-                "Record at a split point has same offset as the previous split point: "
-                    + "previous split point at %d, current record starts at %d",
-                offsetOfLastSplitPoint, recordStart));
-      }
-      if (recordStart >= stopOffset) {
-        return false;
-      }
-      offsetOfLastSplitPoint = recordStart;
-    }
-
-    lastRecordStart = recordStart;
-    return true;
-  }
-
-  @Override
-  public boolean trySplitAtPosition(Long splitOffset) {
-    return trySplitAtPosition(splitOffset.longValue());
-  }
-
-  public synchronized boolean trySplitAtPosition(long splitOffset) {
-    if (stopOffset == OFFSET_INFINITY) {
-      LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset);
-      return false;
-    }
-    if (lastRecordStart == -1) {
-      LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset);
-      return false;
-    }
-
-    // Note: technically it is correct to split at any position after the last returned
-    // split point, not just the last returned record.
-    // TODO: Investigate whether in practice this is useful or, rather, confusing.
-    if (splitOffset <= lastRecordStart) {
-      LOG.debug(
-          "Refusing to split {} at {}: already past proposed split position", this, splitOffset);
-      return false;
-    }
-    if (splitOffset < startOffset || splitOffset >= stopOffset) {
-      LOG.debug(
-          "Refusing to split {} at {}: proposed split position out of range", this, splitOffset);
-      return false;
-    }
-    LOG.debug("Agreeing to split {} at {}", this, splitOffset);
-    this.stopOffset = splitOffset;
-    return true;
-  }
-
-  /**
-   * Returns a position {@code P} such that the range {@code [start, P)} represents approximately
-   * the given fraction of the range {@code [start, end)}. Assumes that the density of records
-   * in the range is approximately uniform.
-   */
-  public synchronized long getPositionForFractionConsumed(double fraction) {
-    if (stopOffset == OFFSET_INFINITY) {
-      throw new IllegalArgumentException(
-          "getPositionForFractionConsumed is not applicable to an unbounded range: " + this);
-    }
-    return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset));
-  }
-
-  @Override
-  public synchronized double getFractionConsumed() {
-    if (stopOffset == OFFSET_INFINITY) {
-      return 0.0;
-    }
-    if (lastRecordStart == -1) {
-      return 0.0;
-    }
-    // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
-    // which is (4 - 3 + 1) / (6 - 3) = 67%.
-    // Also, clamp to at most 1.0 because the last consumed position can extend past the
-    // stop position.
-    return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
-  }
-
-  @Override
-  public synchronized String toString() {
-    String stopString = (stopOffset == OFFSET_INFINITY) ? "infinity" : String.valueOf(stopOffset);
-    if (lastRecordStart >= 0) {
-      return String.format(
-          "<at [starting at %d] of offset range [%d, %s)>",
-          lastRecordStart,
-          startOffset,
-          stopString);
-    } else {
-      return String.format("<unstarted in offset range [%d, %s)>", startOffset, stopString);
-    }
-  }
-
-  /**
-   * Returns a copy of this tracker for testing purposes (to simplify testing methods with
-   * side effects).
-   */
-  @VisibleForTesting
-  OffsetRangeTracker copy() {
-    OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset);
-    res.lastRecordStart = this.lastRecordStart;
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
deleted file mode 100644
index dad9edc..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io.range;
-
-/**
- * A {@code RangeTracker} is a thread-safe helper object for implementing dynamic work rebalancing
- * in position-based {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}
- * subclasses.
- *
- * <h3>Usage of the RangeTracker class hierarchy</h3>
- * The abstract {@code RangeTracker} interface should not be used per se - all users should use its
- * subclasses directly. We declare it here because all subclasses have roughly the same interface
- * and the same properties, to centralize the documentation. Currently we provide one
- * implementation - {@link OffsetRangeTracker}.
- *
- * <h3>Position-based sources</h3>
- * A position-based source is one where the source can be described by a range of positions of
- * an ordered type and the records returned by the reader can be described by positions of the
- * same type.
- *
- * <p>In case a record occupies a range of positions in the source, the most important thing about
- * the record is the position where it starts.
- *
- * <p>Defining the semantics of positions for a source is entirely up to the source class, however
- * the chosen definitions have to obey certain properties in order to make it possible to correctly
- * split the source into parts, including dynamic splitting. Two main aspects need to be defined:
- * <ul>
- *   <li>How to assign starting positions to records.
- *   <li>Which records should be read by a source with a range {@code [A, B)}.
- * </ul>
- * Moreover, reading a range must be <i>efficient</i>, i.e., the performance of reading a range
- * should not significantly depend on the location of the range. For example, reading the range
- * {@code [A, B)} should not require reading all data before {@code A}.
- *
- * <p>The sections below explain exactly what properties these definitions must satisfy, and
- * how to use a {@code RangeTracker} with a properly defined source.
- *
- * <h3>Properties of position-based sources</h3>
- * The main requirement for position-based sources is <i>associativity</i>: reading records from
- * {@code [A, B)} and records from {@code [B, C)} should give the same records as reading from
- * {@code [A, C)}, where {@code A <= B <= C}. This property ensures that no matter how a range
- * of positions is split into arbitrarily many sub-ranges, the total set of records described by
- * them stays the same.
- *
- * <p>The other important property is how the source's range relates to positions of records in
- * the source. In many sources each record can be identified by a unique starting position.
- * In this case:
- * <ul>
- *   <li>All records returned by a source {@code [A, B)} must have starting positions
- *   in this range.
- *   <li>All but the last record should end within this range. The last record may or may not
- *   extend past the end of the range.
- *   <li>Records should not overlap.
- * </ul>
- * Such sources should define "read {@code [A, B)}" as "read from the first record starting at or
- * after A, up to but not including the first record starting at or after B".
- *
- * <p>Some examples of such sources include reading lines or CSV from a text file, reading keys and
- * values from a BigTable, etc.
- *
- * <p>The concept of <i>split points</i> allows to extend the definitions for dealing with sources
- * where some records cannot be identified by a unique starting position.
- *
- * <p>In all cases, all records returned by a source {@code [A, B)} must <i>start</i> at or after
- * {@code A}.
- *
- * <h3>Split points</h3>
- *
- * <p>Some sources may have records that are not directly addressable. For example, imagine a file
- * format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but
- * records within the block cannot be directly addressed without decompressing the block. Let us
- * refer to this hypothetical format as <i>CBF (Compressed Blocks Format)</i>.
- *
- * <p>Many such formats can still satisfy the associativity property. For example, in CBF, reading
- * {@code [A, B)} can mean "read all the records in all blocks whose starting offset is in
- * {@code [A, B)}".
- *
- * <p>To support such complex formats, we introduce the notion of <i>split points</i>. We say that
- * a record is a split point if there exists a position {@code A} such that the record is the first
- * one to be returned when reading the range  {@code [A, infinity)}. In CBF, the only split points
- * would be the first records in each block.
- *
- * <p>Split points allow us to define the meaning of a record's position and a source's range
- * in all cases:
- * <ul>
- *   <li>For a record that is at a split point, its position is defined to be the largest
- *   {@code A} such that reading a source with the range {@code [A, infinity)} returns this record;
- *   <li>Positions of other records are only required to be non-decreasing;
- *   <li>Reading the source {@code [A, B)} must return records starting from the first split point
- *   at or after {@code A}, up to but not including the first split point at or after {@code B}.
- *   In particular, this means that the first record returned by a source MUST always be
- *   a split point.
- *   <li>Positions of split points must be unique.
- * </ul>
- * As a result, for any decomposition of the full range of the source into position ranges, the
- * total set of records will be the full set of records in the source, and each record
- * will be read exactly once.
- *
- * <h3>Consumed positions</h3>
- * As the source is being read, and records read from it are being passed to the downstream
- * transforms in the pipeline, we say that positions in the source are being <i>consumed</i>.
- * When a reader has read a record (or promised to a caller that a record will be returned),
- * positions up to and including the record's start position are considered <i>consumed</i>.
- *
- * <p>Dynamic splitting can happen only at <i>unconsumed</i> positions. If the reader just
- * returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or
- * beyond, as otherwise that record could be read twice (by the current reader and by a reader
- * of the task starting at 43).
- *
- * <h3>Example</h3>
- * The following example uses an {@link OffsetRangeTracker} to support dynamically splitting
- * a source with integer positions (offsets).
- * <pre> {@code
- *   class MyReader implements BoundedReader<Foo> {
- *     private MySource currentSource;
- *     private final OffsetRangeTracker tracker = new OffsetRangeTracker();
- *     ...
- *     MyReader(MySource source) {
- *       this.currentSource = source;
- *       this.tracker = new MyRangeTracker<>(source.getStartOffset(), source.getEndOffset())
- *     }
- *     ...
- *     boolean start() {
- *       ... (general logic for locating the first record) ...
- *       if (!tracker.tryReturnRecordAt(true, recordStartOffset)) return false;
- *       ... (any logic that depends on the record being returned, e.g. counting returned records)
- *       return true;
- *     }
- *     boolean advance() {
- *       ... (general logic for locating the next record) ...
- *       if (!tracker.tryReturnRecordAt(isAtSplitPoint, recordStartOffset)) return false;
- *       ... (any logic that depends on the record being returned, e.g. counting returned records)
- *       return true;
- *     }
- *
- *     double getFractionConsumed() {
- *       return tracker.getFractionConsumed();
- *     }
- *   }
- * } </pre>
- *
- * <h3>Usage with different models of iteration</h3>
- * When using this class to protect a
- * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}, follow the pattern
- * described above.
- *
- * <p>When using this class to protect iteration in the {@code hasNext()/next()}
- * model, consider the record consumed when {@code hasNext()} is about to return true, rather than
- * when {@code next()} is called, because {@code hasNext()} returning true is promising the caller
- * that {@code next()} will have an element to return - so {@link #trySplitAtPosition} must not
- * split the range in a way that would make the record promised by {@code hasNext()} belong to
- * a different range.
- *
- * <p>Also note that implementations of {@code hasNext()} need to ensure
- * that they call {@link #tryReturnRecordAt} only once even if {@code hasNext()} is called
- * repeatedly, due to the requirement on uniqueness of split point positions.
- *
- * @param <PositionT> Type of positions used by the source to define ranges and identify records.
- */
-public interface RangeTracker<PositionT> {
-  /**
-   * Returns the starting position of the current range, inclusive.
-   */
-  PositionT getStartPosition();
-
-  /**
-   * Returns the ending position of the current range, exclusive.
-   */
-  PositionT getStopPosition();
-
-  /**
-   * Atomically determines whether a record at the given position can be returned and updates
-   * internal state. In particular:
-   * <ul>
-   *   <li>If {@code isAtSplitPoint} is {@code true}, and {@code recordStart} is outside the current
-   *   range, returns {@code false};
-   *   <li>Otherwise, updates the last-consumed position to {@code recordStart} and returns
-   *   {@code true}.
-   * </ul>
-   * <p>This method MUST be called on all split point records. It may be called on every record.
-   */
-  boolean tryReturnRecordAt(boolean isAtSplitPoint, PositionT recordStart);
-
-  /**
-   * Atomically splits the current range [{@link #getStartPosition}, {@link #getStopPosition})
-   * into a "primary" part [{@link #getStartPosition}, {@code splitPosition})
-   * and a "residual" part [{@code splitPosition}, {@link #getStopPosition}), assuming the current
-   * last-consumed position is within [{@link #getStartPosition}, splitPosition)
-   * (i.e., {@code splitPosition} has not been consumed yet).
-   *
-   * <p>Updates the current range to be the primary and returns {@code true}. This means that
-   * all further calls on the current object will interpret their arguments relative to the
-   * primary range.
-   *
-   * <p>If the split position has already been consumed, or if no {@link #tryReturnRecordAt} call
-   * was made yet, returns {@code false}. The second condition is to prevent dynamic splitting
-   * during reader start-up.
-   */
-  boolean trySplitAtPosition(PositionT splitPosition);
-
-  /**
-   * Returns the approximate fraction of positions in the source that have been consumed by
-   * successful {@link #tryReturnRecordAt} calls, or 0.0 if no such calls have happened.
-   */
-  double getFractionConsumed();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
deleted file mode 100644
index 9117e43..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Provides thread-safe helpers for implementing dynamic work rebalancing in position-based
- * bounded sources.
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to get started.
- */
-package com.google.cloud.dataflow.sdk.io.range;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
deleted file mode 100644
index 2860fbe..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Options that allow setting the application name.
- */
-public interface ApplicationNameOptions extends PipelineOptions {
-  /**
-   * Name of application, for display purposes.
-   *
-   * <p>Defaults to the name of the class that constructs the {@link PipelineOptions}
-   * via the {@link PipelineOptionsFactory}.
-   */
-  @Description("Name of application for display purposes. Defaults to the name of the class that "
-      + "constructs the PipelineOptions via the PipelineOptionsFactory.")
-  String getAppName();
-  void setAppName(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
deleted file mode 100644
index 5576b5f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * Properties needed when using BigQuery with the Dataflow SDK.
- */
-@Description("Options that are used to configure BigQuery. See "
-    + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.")
-public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-  @Description("Temporary dataset for BigQuery table operations. "
-      + "Supported values are \"bigquery.googleapis.com/{dataset}\"")
-  @Default.String("bigquery.googleapis.com/cloud_dataflow")
-  String getTempDatasetId();
-  void setTempDatasetId(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java
deleted file mode 100644
index 843eba1..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * {@link Default} represents a set of annotations that can be used to annotate getter properties
- * on {@link PipelineOptions} with information representing the default value to be returned
- * if no value is specified.
- */
-public @interface Default {
-  /**
-   * This represents that the default of the option is the specified {@link java.lang.Class} value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Class {
-    java.lang.Class<?> value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified {@link java.lang.String}
-   * value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface String {
-    java.lang.String value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified boolean primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Boolean {
-    boolean value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified char primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Character {
-    char value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified byte primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Byte {
-    byte value();
-  }
-  /**
-   * This represents that the default of the option is the specified short primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Short {
-    short value();
-  }
-  /**
-   * This represents that the default of the option is the specified int primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Integer {
-    int value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified long primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Long {
-    long value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified float primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Float {
-    float value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified double primitive value.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Double {
-    double value();
-  }
-
-  /**
-   * This represents that the default of the option is the specified enum.
-   * The value should equal the enum's {@link java.lang.Enum#name() name}.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface Enum {
-    java.lang.String value();
-  }
-
-  /**
-   * Value must be of type {@link DefaultValueFactory} and have a default constructor.
-   * Value is instantiated and then used as a factory to generate the default.
-   *
-   * <p>See {@link DefaultValueFactory} for more details.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  @Documented
-  public @interface InstanceFactory {
-    java.lang.Class<? extends DefaultValueFactory<?>> value();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java
deleted file mode 100644
index a6e7856..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-/**
- * An interface used with the {@link Default.InstanceFactory} annotation to specify the class that
- * will be an instance factory to produce default values for a given getter on
- * {@link PipelineOptions}. When a property on a {@link PipelineOptions} is fetched, and is
- * currently unset, the default value factory will be instantiated and invoked.
- *
- * <p>Care must be taken to not produce an infinite loop when accessing other fields on the
- * {@link PipelineOptions} object.
- *
- * @param <T> The type of object this factory produces.
- */
-public interface DefaultValueFactory<T> {
-  /**
-   * Creates a default value for a getter marked with {@link Default.InstanceFactory}.
-   *
-   * @param options The current pipeline options.
-   * @return The default value to be used for the annotated getter.
-   */
-  T create(PipelineOptions options);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java
deleted file mode 100644
index 4f90c2a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Descriptions are used to generate human readable output when the {@code --help}
- * command is specified. Description annotations placed on interfaces that extend
- * {@link PipelineOptions} will describe groups of related options. Description annotations
- * placed on getter methods will be used to provide human readable information
- * for the specific option.
- */
-@Target({ElementType.METHOD, ElementType.TYPE})
-@Retention(RetentionPolicy.RUNTIME)
-public @interface Description {
-  String value();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java
deleted file mode 100644
index a505632..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipeline;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Options that can be used to configure the {@link DirectPipeline}.
- */
-public interface DirectPipelineOptions extends
-    ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions,
-    PipelineOptions, StreamingOptions {
-
-  /**
-   * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}.
-   * If not explicitly specified, a random seed will be generated.
-   */
-  @JsonIgnore
-  @Description("The random seed to use for pseudorandom behaviors in the DirectPipelineRunner."
-      + " If not explicitly specified, a random seed will be generated.")
-  Long getDirectPipelineRunnerRandomSeed();
-  void setDirectPipelineRunnerRandomSeed(Long value);
-
-  /**
-   * Controls whether the runner should ensure that all of the elements of
-   * the pipeline, such as DoFns, can be serialized.
-   */
-  @JsonIgnore
-  @Description("Controls whether the runner should ensure that all of the elements of the "
-      + "pipeline, such as DoFns, can be serialized.")
-  @Default.Boolean(true)
-  boolean isTestSerializability();
-  void setTestSerializability(boolean testSerializability);
-
-  /**
-   * Controls whether the runner should ensure that all of the elements of
-   * every {@link PCollection} can be encoded using the appropriate
-   * {@link Coder}.
-   */
-  @JsonIgnore
-  @Description("Controls whether the runner should ensure that all of the elements of every "
-      + "PCollection can be encoded using the appropriate Coder.")
-  @Default.Boolean(true)
-  boolean isTestEncodability();
-  void setTestEncodability(boolean testEncodability);
-
-  /**
-   * Controls whether the runner should randomize the order of each
-   * {@link PCollection}.
-   */
-  @JsonIgnore
-  @Description("Controls whether the runner should randomize the order of each PCollection.")
-  @Default.Boolean(true)
-  boolean isTestUnorderedness();
-  void setTestUnorderedness(boolean testUnorderedness);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java
deleted file mode 100644
index 52028cd..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
-import com.google.cloud.dataflow.sdk.util.CredentialFactory;
-import com.google.cloud.dataflow.sdk.util.GcpCredentialFactory;
-import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.Files;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
-import java.util.Locale;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Options used to configure Google Cloud Platform project and credentials.
- *
- * <p>These options configure which of the following three different mechanisms for obtaining a
- * credential are used:
- * <ol>
- *   <li>
- *     It can fetch the
- *     <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- *     application default credentials</a>.
- *   </li>
- *   <li>
- *     The user can specify a client secrets file and go through the OAuth2
- *     webflow. The credential will then be cached in the user's home
- *     directory for reuse.
- *   </li>
- *   <li>
- *     The user can specify a file containing a service account private key along
- *     with the service account name.
- *   </li>
- * </ol>
- *
- * <p>The default mechanism is to use the
- * <a href="https://developers.google.com/accounts/docs/application-default-credentials">
- * application default credentials</a>. The other options can be
- * used by setting the corresponding properties.
- */
-@Description("Options used to configure Google Cloud Platform project and credentials.")
-public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
-  /**
-   * Project id to use when launching jobs.
-   */
-  @Description("Project id. Required when running a Dataflow in the cloud. "
-      + "See https://cloud.google.com/storage/docs/projects for further details.")
-  @Default.InstanceFactory(DefaultProjectFactory.class)
-  String getProject();
-  void setProject(String value);
-
-  /**
-   * This option controls which file to use when attempting to create the credentials using the
-   * service account method.
-   *
-   * <p>This option if specified, needs be combined with the
-   * {@link GcpOptions#getServiceAccountName() serviceAccountName}.
-   */
-  @JsonIgnore
-  @Description("Controls which file to use when attempting to create the credentials "
-      + "using the service account method. This option if specified, needs to be combined with "
-      + "the serviceAccountName option.")
-  String getServiceAccountKeyfile();
-  void setServiceAccountKeyfile(String value);
-
-  /**
-   * This option controls which service account to use when attempting to create the credentials
-   * using the service account method.
-   *
-   * <p>This option if specified, needs be combined with the
-   * {@link GcpOptions#getServiceAccountKeyfile() serviceAccountKeyfile}.
-   */
-  @JsonIgnore
-  @Description("Controls which service account to use when attempting to create the credentials "
-      + "using the service account method. This option if specified, needs to be combined with "
-      + "the serviceAccountKeyfile option.")
-  String getServiceAccountName();
-  void setServiceAccountName(String value);
-
-  /**
-   * This option controls which file to use when attempting to create the credentials
-   * using the OAuth 2 webflow. After the OAuth2 webflow, the credentials will be stored
-   * within credentialDir.
-   */
-  @JsonIgnore
-  @Description("This option controls which file to use when attempting to create the credentials "
-      + "using the OAuth 2 webflow. After the OAuth2 webflow, the credentials will be stored "
-      + "within credentialDir.")
-  String getSecretsFile();
-  void setSecretsFile(String value);
-
-  /**
-   * This option controls which credential store to use when creating the credentials
-   * using the OAuth 2 webflow.
-   */
-  @Description("This option controls which credential store to use when creating the credentials "
-      + "using the OAuth 2 webflow.")
-  @Default.String("cloud_dataflow")
-  String getCredentialId();
-  void setCredentialId(String value);
-
-  /**
-   * Directory for storing dataflow credentials after execution of the OAuth 2 webflow. Defaults
-   * to using the $HOME/.store/data-flow directory.
-   */
-  @Description("Directory for storing dataflow credentials after execution of the OAuth 2 webflow. "
-      + "Defaults to using the $HOME/.store/data-flow directory.")
-  @Default.InstanceFactory(CredentialDirFactory.class)
-  String getCredentialDir();
-  void setCredentialDir(String value);
-
-  /**
-   * Returns the default credential directory of ${user.home}/.store/data-flow.
-   */
-  public static class CredentialDirFactory implements DefaultValueFactory<String> {
-    @Override
-    public String create(PipelineOptions options) {
-      File home = new File(System.getProperty("user.home"));
-      File store = new File(home, ".store");
-      File dataflow = new File(store, "data-flow");
-      return dataflow.getPath();
-    }
-  }
-
-  /**
-   * The class of the credential factory that should be created and used to create
-   * credentials. If gcpCredential has not been set explicitly, an instance of this class will
-   * be constructed and used as a credential factory.
-   */
-  @Description("The class of the credential factory that should be created and used to create "
-      + "credentials. If gcpCredential has not been set explicitly, an instance of this class will "
-      + "be constructed and used as a credential factory.")
-  @Default.Class(GcpCredentialFactory.class)
-  Class<? extends CredentialFactory> getCredentialFactoryClass();
-  void setCredentialFactoryClass(
-      Class<? extends CredentialFactory> credentialFactoryClass);
-
-  /**
-   * The credential instance that should be used to authenticate against GCP services.
-   * If no credential has been set explicitly, the default is to use the instance factory
-   * that constructs a credential based upon the currently set credentialFactoryClass.
-   */
-  @JsonIgnore
-  @Description("The credential instance that should be used to authenticate against GCP services. "
-      + "If no credential has been set explicitly, the default is to use the instance factory "
-      + "that constructs a credential based upon the currently set credentialFactoryClass.")
-  @Default.InstanceFactory(GcpUserCredentialsFactory.class)
-  @Hidden
-  Credential getGcpCredential();
-  void setGcpCredential(Credential value);
-
-  /**
-   * Attempts to infer the default project based upon the environment this application
-   * is executing within. Currently this only supports getting the default project from gcloud.
-   */
-  public static class DefaultProjectFactory implements DefaultValueFactory<String> {
-    private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class);
-
-    @Override
-    public String create(PipelineOptions options) {
-      try {
-        File configFile;
-        if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) {
-          configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties");
-        } else if (isWindows() && getEnvironment().containsKey("APPDATA")) {
-          configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties");
-        } else {
-          // New versions of gcloud use this file
-          configFile = new File(
-              System.getProperty("user.home"),
-              ".config/gcloud/configurations/config_default");
-          if (!configFile.exists()) {
-            // Old versions of gcloud use this file
-            configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties");
-          }
-        }
-        String section = null;
-        Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$");
-        Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$");
-        for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) {
-          line = line.trim();
-          if (line.isEmpty() || line.startsWith(";")) {
-            continue;
-          }
-          Matcher matcher = sectionPattern.matcher(line);
-          if (matcher.matches()) {
-            section = matcher.group(1);
-          } else if (section == null || section.equals("core")) {
-            matcher = projectPattern.matcher(line);
-            if (matcher.matches()) {
-              String project = matcher.group(1).trim();
-              LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect "
-                  + "project, please cancel this Pipeline and specify the command-line "
-                  + "argument --project.", project);
-              return project;
-            }
-          }
-        }
-      } catch (IOException expected) {
-        LOG.debug("Failed to find default project.", expected);
-      }
-      // return null if can't determine
-      return null;
-    }
-
-    /**
-     * Returns true if running on the Windows OS.
-     */
-    private static boolean isWindows() {
-      return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows");
-    }
-
-    /**
-     * Used to mock out getting environment variables.
-     */
-    @VisibleForTesting
-    Map<String, String> getEnvironment() {
-        return System.getenv();
-    }
-  }
-
-  /**
-   * Attempts to load the GCP credentials. See
-   * {@link CredentialFactory#getCredential()} for more details.
-   */
-  public static class GcpUserCredentialsFactory implements DefaultValueFactory<Credential> {
-    @Override
-    public Credential create(PipelineOptions options) {
-      GcpOptions gcpOptions = options.as(GcpOptions.class);
-      try {
-        CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class)
-            .fromClass(gcpOptions.getCredentialFactoryClass())
-            .fromFactoryMethod("fromOptions")
-            .withArg(PipelineOptions.class, options)
-            .build();
-        return factory.getCredential();
-      } catch (IOException | GeneralSecurityException e) {
-        throw new RuntimeException("Unable to obtain credential", e);
-      }
-    }
-  }
-
-  /**
-   * The token server URL to use for OAuth 2 authentication. Normally, the default is sufficient,
-   * but some specialized use cases may want to override this value.
-   */
-  @Description("The token server URL to use for OAuth 2 authentication. Normally, the default "
-      + "is sufficient, but some specialized use cases may want to override this value.")
-  @Default.String(GoogleOAuthConstants.TOKEN_SERVER_URL)
-  @Hidden
-  String getTokenServerUrl();
-  void setTokenServerUrl(String value);
-
-  /**
-   * The authorization server URL to use for OAuth 2 authentication. Normally, the default is
-   * sufficient, but some specialized use cases may want to override this value.
-   */
-  @Description("The authorization server URL to use for OAuth 2 authentication. Normally, the "
-      + "default is sufficient, but some specialized use cases may want to override this value.")
-  @Default.String(GoogleOAuthConstants.AUTHORIZATION_SERVER_URL)
-  @Hidden
-  String getAuthorizationServerEncodedUrl();
-  void setAuthorizationServerEncodedUrl(String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java
deleted file mode 100644
index 130310a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.cloud.dataflow.sdk.util.AppEngineEnvironment;
-import com.google.cloud.dataflow.sdk.util.GcsUtil;
-import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Options used to configure Google Cloud Storage.
- */
-public interface GcsOptions extends
-    ApplicationNameOptions, GcpOptions, PipelineOptions {
-  /**
-   * The GcsUtil instance that should be used to communicate with Google Cloud Storage.
-   */
-  @JsonIgnore
-  @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.")
-  @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class)
-  @Hidden
-  GcsUtil getGcsUtil();
-  void setGcsUtil(GcsUtil value);
-
-  /**
-   * The ExecutorService instance to use to create threads, can be overridden to specify an
-   * ExecutorService that is compatible with the users environment. If unset, the
-   * default is to create an ExecutorService with an unbounded number of threads; this
-   * is compatible with Google AppEngine.
-   */
-  @JsonIgnore
-  @Description("The ExecutorService instance to use to create multiple threads. Can be overridden "
-      + "to specify an ExecutorService that is compatible with the users environment. If unset, "
-      + "the default is to create an ExecutorService with an unbounded number of threads; this "
-      + "is compatible with Google AppEngine.")
-  @Default.InstanceFactory(ExecutorServiceFactory.class)
-  @Hidden
-  ExecutorService getExecutorService();
-  void setExecutorService(ExecutorService value);
-
-  /**
-   * GCS endpoint to use. If unspecified, uses the default endpoint.
-   */
-  @JsonIgnore
-  @Hidden
-  @Description("The URL for the GCS API.")
-  String getGcsEndpoint();
-  void setGcsEndpoint(String value);
-
-  /**
-   * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for
-   * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the
-   * restrictions and performance implications of this value.
-   */
-  @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the "
-      + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more "
-      + "information on the restrictions and performance implications of this value.\n\n"
-      + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/"
-      + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java")
-  Integer getGcsUploadBufferSizeBytes();
-  void setGcsUploadBufferSizeBytes(Integer bytes);
-
-  /**
-   * Returns the default {@link ExecutorService} to use within the Dataflow SDK. The
-   * {@link ExecutorService} is compatible with AppEngine.
-   */
-  public static class ExecutorServiceFactory implements DefaultValueFactory<ExecutorService> {
-    @SuppressWarnings("deprecation")  // IS_APP_ENGINE is deprecated for internal use only.
-    @Override
-    public ExecutorService create(PipelineOptions options) {
-      ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
-      threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory());
-      if (!AppEngineEnvironment.IS_APP_ENGINE) {
-        // AppEngine doesn't allow modification of threads to be daemon threads.
-        threadFactoryBuilder.setDaemon(true);
-      }
-      /* The SDK requires an unbounded thread pool because a step may create X writers
-       * each requiring their own thread to perform the writes otherwise a writer may
-       * block causing deadlock for the step because the writers buffer is full.
-       * Also, the MapTaskExecutor launches the steps in reverse order and completes
-       * them in forward order thus requiring enough threads so that each step's writers
-       * can be active.
-       */
-      return new ThreadPoolExecutor(
-          0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads.
-          Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever.
-          new SynchronousQueue<Runnable>(),
-          threadFactoryBuilder.build());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java
deleted file mode 100644
index 1863141..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import com.google.api.client.googleapis.services.AbstractGoogleClient;
-import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
-import com.google.api.client.googleapis.services.GoogleClientRequestInitializer;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * These options configure debug settings for Google API clients created within the Dataflow SDK.
- */
-public interface GoogleApiDebugOptions extends PipelineOptions {
-  /**
-   * This option enables tracing of API calls to Google services used within the
-   * Dataflow SDK. Values are expected in JSON format <code>{"ApiName":"TraceDestination",...}
-   * </code> where the {@code ApiName} represents the request classes canonical name. The
-   * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported.
-   * Typically, "producer" is the right destination to use: this makes API traces available to the
-   * team offering the API. Note that by enabling this option, the contents of the requests to and
-   * from Google Cloud services will be made available to Google. For example, by specifying
-   * <code>{"Dataflow":"producer"}</code>, all calls to the Dataflow service will be made available
-   * to Google, specifically to the Google Cloud Dataflow team.
-   */
-  @Description("This option enables tracing of API calls to Google services used within the "
-      + "Dataflow SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} "
-      + "where the ApiName represents the request classes canonical name. The TraceDestination is "
-      + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is "
-      + "the right destination to use: this makes API traces available to the team offering the "
-      + "API. Note that by enabling this option, the contents of the requests to and from "
-      + "Google Cloud services will be made available to Google. For example, by specifying "
-      + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to "
-      + "Google, specifically to the Google Cloud Dataflow team.")
-  GoogleApiTracer getGoogleApiTrace();
-  void setGoogleApiTrace(GoogleApiTracer commands);
-
-  /**
-   * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls.
-   */
-  public static class GoogleApiTracer extends HashMap<String, String>
-      implements GoogleClientRequestInitializer {
-    /**
-     * Creates a {@link GoogleApiTracer} that sets the trace destination on all
-     * calls that match the given client type.
-     */
-    public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) {
-      put(client.getClass().getCanonicalName(), traceDestination);
-      return this;
-    }
-
-    /**
-     * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all
-     * calls that match for the given request type.
-     */
-    public GoogleApiTracer addTraceFor(
-        AbstractGoogleClientRequest<?> request, String traceDestination) {
-      put(request.getClass().getCanonicalName(), traceDestination);
-      return this;
-    }
-
-    @Override
-    public void initialize(AbstractGoogleClientRequest<?> request) throws IOException {
-      for (Map.Entry<String, String> entry : this.entrySet()) {
-        if (request.getClass().getCanonicalName().contains(entry.getKey())) {
-          request.set("$trace", entry.getValue());
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java
deleted file mode 100644
index fd25db8..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.options;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Methods and/or interfaces annotated with {@code @Hidden} will be suppressed from
- * being output when {@code --help} is specified on the command-line.
- */
-@Target({ElementType.METHOD, ElementType.TYPE})
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface Hidden {
-}


Mime
View raw message