Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 763AA19677 for ; Thu, 14 Apr 2016 04:48:08 +0000 (UTC) Received: (qmail 49454 invoked by uid 500); 14 Apr 2016 04:48:08 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 49407 invoked by uid 500); 14 Apr 2016 04:48:08 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 49398 invoked by uid 99); 14 Apr 2016 04:48:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:48:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id B9949C26C9 for ; Thu, 14 Apr 2016 04:48:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.216 X-Spam-Level: X-Spam-Status: No, score=-4.216 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.996] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1wAF029M84q8 for ; Thu, 14 Apr 2016 04:48:01 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id CA9885FBC0 for ; Thu, 14 Apr 2016 04:47:49 +0000 (UTC) Received: (qmail 47621 invoked by uid 99); 14 Apr 2016 04:47:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Apr 2016 04:47:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 93870E0158; Thu, 14 Apr 2016 04:47:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Thu, 14 Apr 2016 04:47:58 -0000 Message-Id: <0593cb496a4b4492aaa3a41fafc4c606@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java deleted file mode 100644 index c92e7e9..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRange.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.range; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; - -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; - -/** - * A class representing a range of {@link ByteKey ByteKeys}. - * - *

Instances of {@link ByteKeyRange} are immutable. - * - *

A {@link ByteKeyRange} enforces the restriction that its start and end keys must form a valid, - * non-empty range {@code [startKey, endKey)} that is inclusive of the start key and exclusive of - * the end key. - * - *

When the end key is empty, it is treated as the largest possible key. - * - *

Interpreting {@link ByteKey} in a {@link ByteKeyRange}

- * - *

The primary role of {@link ByteKeyRange} is to provide functionality for - * {@link #estimateFractionForKey(ByteKey)}, {@link #interpolateKey(double)}, and - * {@link #split(int)}, which are used for Google Cloud Dataflow's - * Autoscaling - * and Dynamic Work Rebalancing features. - * - *

{@link ByteKeyRange} implements these features by treating a {@link ByteKey}'s underlying - * {@code byte[]} as the binary expansion of floating point numbers in the range {@code [0.0, 1.0]}. - * For example, the keys {@code ByteKey.of(0x80)}, {@code ByteKey.of(0xc0)}, and - * {@code ByteKey.of(0xe0)} are interpreted as {@code 0.5}, {@code 0.75}, and {@code 0.875} - * respectively. The empty {@code ByteKey.EMPTY} is interpreted as {@code 0.0} when used as the - * start of a range and {@code 1.0} when used as the end key. - * - *

Key interpolation, fraction estimation, and range splitting are all interpreted in these - * floating-point semantics. See the respective implementations for further details. Note: - * the underlying implementations of these functions use {@link BigInteger} and {@link BigDecimal}, - * so they can be slow and should not be called in hot loops. Dataflow's dynamic work - * rebalancing will only invoke these functions during periodic control operations, so they are not - * called on the critical path. - * - * @see ByteKey - */ -public final class ByteKeyRange implements Serializable { - private static final Logger logger = LoggerFactory.getLogger(ByteKeyRange.class); - - /** The range of all keys, with empty start and end keys. */ - public static final ByteKeyRange ALL_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.EMPTY); - - /** - * Creates a new {@link ByteKeyRange} with the given start and end keys. - * - *

Note that if {@code endKey} is empty, it is treated as the largest possible key. - * - * @see ByteKeyRange - * - * @throws IllegalArgumentException if {@code endKey} is less than or equal to {@code startKey}, - * unless {@code endKey} is empty indicating the maximum possible {@link ByteKey}. - */ - public static ByteKeyRange of(ByteKey startKey, ByteKey endKey) { - return new ByteKeyRange(startKey, endKey); - } - - /** - * Returns the {@link ByteKey} representing the lower bound of this {@link ByteKeyRange}. - */ - public ByteKey getStartKey() { - return startKey; - } - - /** - * Returns the {@link ByteKey} representing the upper bound of this {@link ByteKeyRange}. - * - *

Note that if {@code endKey} is empty, it is treated as the largest possible key. - */ - public ByteKey getEndKey() { - return endKey; - } - - /** - * Returns {@code true} if the specified {@link ByteKey} is contained within this range. - */ - public Boolean containsKey(ByteKey key) { - return key.compareTo(startKey) >= 0 && endsAfterKey(key); - } - - /** - * Returns {@code true} if the specified {@link ByteKeyRange} overlaps this range. - */ - public Boolean overlaps(ByteKeyRange other) { - // If each range starts before the other range ends, then they must overlap. - // { [] } -- one range inside the other OR { [ } ] -- partial overlap. - return endsAfterKey(other.startKey) && other.endsAfterKey(startKey); - } - - /** - * Returns a list of up to {@code numSplits + 1} {@link ByteKey ByteKeys} in ascending order, - * where the keys have been interpolated to form roughly equal sub-ranges of this - * {@link ByteKeyRange}, assuming a uniform distribution of keys within this range. - * - *

The first {@link ByteKey} in the result is guaranteed to be equal to {@link #getStartKey}, - * and the last {@link ByteKey} in the result is guaranteed to be equal to {@link #getEndKey}. - * Thus the resulting list exactly spans the same key range as this {@link ByteKeyRange}. - * - *

Note that the number of keys returned is not always equal to {@code numSplits + 1}. - * Specifically, if this range is unsplittable (e.g., because the start and end keys are equal - * up to padding by zero bytes), the list returned will only contain the start and end key. - * - * @throws IllegalArgumentException if the specified number of splits is < 1 - * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about split semantics. - */ - public List split(int numSplits) { - checkArgument(numSplits > 0, "numSplits %s must be a positive integer", numSplits); - - try { - ImmutableList.Builder ret = ImmutableList.builder(); - ret.add(startKey); - for (int i = 1; i < numSplits; ++i) { - ret.add(interpolateKey(i / (double) numSplits)); - } - ret.add(endKey); - return ret.build(); - } catch (IllegalStateException e) { - // The range is not splittable -- just return - return ImmutableList.of(startKey, endKey); - } - } - - /** - * Returns the fraction of this range {@code [startKey, endKey)} that is in the interval - * {@code [startKey, key)}. - * - * @throws IllegalArgumentException if {@code key} does not fall within this range - * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics. - */ - public double estimateFractionForKey(ByteKey key) { - checkNotNull(key, "key"); - checkArgument(!key.isEmpty(), "Cannot compute fraction for an empty key"); - checkArgument( - key.compareTo(startKey) >= 0, "Expected key %s >= range start key %s", key, startKey); - - if (key.equals(endKey)) { - return 1.0; - } - checkArgument(containsKey(key), "Cannot compute fraction for %s outside this %s", key, this); - - byte[] startBytes = startKey.getBytes(); - byte[] endBytes = endKey.getBytes(); - byte[] keyBytes = key.getBytes(); - // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other - // keys, to get a concrete least upper bound for the desired range. - if (endKey.isEmpty()) { - startBytes = addHeadByte(startBytes, (byte) 0); - endBytes = addHeadByte(endBytes, (byte) 1); - keyBytes = addHeadByte(keyBytes, (byte) 0); - } - - // Pad to the longest of all 3 keys. - int paddedKeyLength = Math.max(Math.max(startBytes.length, endBytes.length), keyBytes.length); - BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength); - BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength); - BigInteger keyInt = paddedPositiveInt(keyBytes, paddedKeyLength); - - // Keys are equal subject to padding by 0. - BigInteger range = rangeEndInt.subtract(rangeStartInt); - if (range.equals(BigInteger.ZERO)) { - logger.warn( - "Using 0.0 as the default fraction for this near-empty range {} where start and end keys" - + " differ only by trailing zeros.", - this); - return 0.0; - } - - // Compute the progress (key-start)/(end-start) scaling by 2^64, dividing (which rounds), - // and then scaling down after the division. This gives ample precision when converted to - // double. - BigInteger progressScaled = keyInt.subtract(rangeStartInt).shiftLeft(64); - return progressScaled.divide(range).doubleValue() / Math.pow(2, 64); - } - - /** - * Returns a {@link ByteKey} {@code key} such that {@code [startKey, key)} represents - * approximately the specified fraction of the range {@code [startKey, endKey)}. The interpolation - * is computed assuming a uniform distribution of keys. - * - *

For example, given the largest possible range (defined by empty start and end keys), the - * fraction {@code 0.5} will return the {@code ByteKey.of(0x80)}, which will also be returned for - * ranges {@code [0x40, 0xc0)} and {@code [0x6f, 0x91)}. - * - *

The key returned will never be empty. - * - * @throws IllegalArgumentException if {@code fraction} is outside the range [0, 1) - * @throws IllegalStateException if this range cannot be interpolated - * @see ByteKeyRange the ByteKeyRange class Javadoc for more information about fraction semantics. - */ - public ByteKey interpolateKey(double fraction) { - checkArgument( - fraction >= 0.0 && fraction < 1.0, "Fraction %s must be in the range [0, 1)", fraction); - byte[] startBytes = startKey.getBytes(); - byte[] endBytes = endKey.getBytes(); - // If the endKey is unspecified, add a leading 1 byte to it and a leading 0 byte to all other - // keys, to get a concrete least upper bound for the desired range. - if (endKey.isEmpty()) { - startBytes = addHeadByte(startBytes, (byte) 0); - endBytes = addHeadByte(endBytes, (byte) 1); - } - - // Pad to the longest key. - int paddedKeyLength = Math.max(startBytes.length, endBytes.length); - BigInteger rangeStartInt = paddedPositiveInt(startBytes, paddedKeyLength); - BigInteger rangeEndInt = paddedPositiveInt(endBytes, paddedKeyLength); - - // If the keys are equal subject to padding by 0, we can't interpolate. - BigInteger range = rangeEndInt.subtract(rangeStartInt); - checkState( - !range.equals(BigInteger.ZERO), - "Refusing to interpolate for near-empty %s where start and end keys differ only by trailing" - + " zero bytes.", - this); - - // Add precision so that range is at least 53 (double mantissa length) bits long. This way, we - // can interpolate small ranges finely, e.g., split the range key 3 to key 4 into 1024 parts. - // We add precision to range by adding zero bytes to the end of the keys, aka shifting the - // underlying BigInteger left by a multiple of 8 bits. - int bytesNeeded = ((53 - range.bitLength()) + 7) / 8; - if (bytesNeeded > 0) { - range = range.shiftLeft(bytesNeeded * 8); - rangeStartInt = rangeStartInt.shiftLeft(bytesNeeded * 8); - paddedKeyLength += bytesNeeded; - } - - BigInteger interpolatedOffset = - new BigDecimal(range).multiply(BigDecimal.valueOf(fraction)).toBigInteger(); - - int outputKeyLength = endKey.isEmpty() ? (paddedKeyLength - 1) : paddedKeyLength; - return ByteKey.copyFrom( - fixupHeadZeros(rangeStartInt.add(interpolatedOffset).toByteArray(), outputKeyLength)); - } - - /** - * Returns new {@link ByteKeyRange} like this one, but with the specified start key. - */ - public ByteKeyRange withStartKey(ByteKey startKey) { - return new ByteKeyRange(startKey, endKey); - } - - /** - * Returns new {@link ByteKeyRange} like this one, but with the specified end key. - */ - public ByteKeyRange withEndKey(ByteKey endKey) { - return new ByteKeyRange(startKey, endKey); - } - - //////////////////////////////////////////////////////////////////////////////////// - private final ByteKey startKey; - private final ByteKey endKey; - - private ByteKeyRange(ByteKey startKey, ByteKey endKey) { - this.startKey = checkNotNull(startKey, "startKey"); - this.endKey = checkNotNull(endKey, "endKey"); - checkArgument(endsAfterKey(startKey), "Start %s must be less than end %s", startKey, endKey); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(ByteKeyRange.class) - .add("startKey", startKey) - .add("endKey", endKey) - .toString(); - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (!(o instanceof ByteKeyRange)) { - return false; - } - ByteKeyRange other = (ByteKeyRange) o; - return Objects.equals(startKey, other.startKey) && Objects.equals(endKey, other.endKey); - } - - @Override - public int hashCode() { - return Objects.hash(startKey, endKey); - } - - /** - * Returns a copy of the specified array with the specified byte added at the front. - */ - private static byte[] addHeadByte(byte[] array, byte b) { - byte[] ret = new byte[array.length + 1]; - ret[0] = b; - System.arraycopy(array, 0, ret, 1, array.length); - return ret; - } - - /** - * Ensures the array is exactly {@code size} bytes long. Returns the input array if the condition - * is met, otherwise either adds or removes zero bytes from the beginning of {@code array}. - */ - private static byte[] fixupHeadZeros(byte[] array, int size) { - int padding = size - array.length; - if (padding == 0) { - return array; - } - - if (padding < 0) { - // There is one zero byte at the beginning, added by BigInteger to make there be a sign - // bit when converting to bytes. - verify( - padding == -1, - "key %s: expected length %d with exactly one byte of padding, found %d", - ByteKey.copyFrom(array), - size, - -padding); - verify( - (array[0] == 0) && ((array[1] & 0x80) == 0x80), - "key %s: is 1 byte longer than expected, indicating BigInteger padding. Expect first byte" - + " to be zero with set MSB in second byte.", - ByteKey.copyFrom(array)); - return Arrays.copyOfRange(array, 1, array.length); - } - - byte[] ret = new byte[size]; - System.arraycopy(array, 0, ret, padding, array.length); - return ret; - } - - /** - * Returns {@code true} when the specified {@code key} is smaller this range's end key. The only - * semantic change from {@code (key.compareTo(getEndKey()) < 0)} is that the empty end key is - * treated as larger than all possible {@link ByteKey keys}. - */ - boolean endsAfterKey(ByteKey key) { - return endKey.isEmpty() || key.compareTo(endKey) < 0; - } - - /** Builds a BigInteger out of the specified array, padded to the desired byte length. */ - private static BigInteger paddedPositiveInt(byte[] bytes, int length) { - int bytePaddingNeeded = length - bytes.length; - checkArgument( - bytePaddingNeeded >= 0, "Required bytes.length {} < length {}", bytes.length, length); - BigInteger ret = new BigInteger(1, bytes); - return (bytePaddingNeeded == 0) ? ret : ret.shiftLeft(8 * bytePaddingNeeded); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java deleted file mode 100644 index 0d01f8f..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/ByteKeyRangeTracker.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.range; - -import static com.google.common.base.MoreObjects.toStringHelper; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -/** - * A {@link RangeTracker} for {@link ByteKey ByteKeys} in {@link ByteKeyRange ByteKeyRanges}. - * - * @see ByteKey - * @see ByteKeyRange - */ -public final class ByteKeyRangeTracker implements RangeTracker { - private static final Logger logger = LoggerFactory.getLogger(ByteKeyRangeTracker.class); - - /** Instantiates a new {@link ByteKeyRangeTracker} with the specified range. */ - public static ByteKeyRangeTracker of(ByteKeyRange range) { - return new ByteKeyRangeTracker(range); - } - - @Override - public synchronized ByteKey getStartPosition() { - return range.getStartKey(); - } - - @Override - public synchronized ByteKey getStopPosition() { - return range.getEndKey(); - } - - @Override - public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) { - if (isAtSplitPoint && !range.containsKey(recordStart)) { - return false; - } - position = recordStart; - return true; - } - - @Override - public synchronized boolean trySplitAtPosition(ByteKey splitPosition) { - // Unstarted. - if (position == null) { - logger.warn( - "{}: Rejecting split request at {} because no records have been returned.", - this, - splitPosition); - return false; - } - - // Started, but not after current position. - if (splitPosition.compareTo(position) <= 0) { - logger.warn( - "{}: Rejecting split request at {} because it is not after current position {}.", - this, - splitPosition, - position); - return false; - } - - // Sanity check. - if (!range.containsKey(splitPosition)) { - logger.warn( - "{}: Rejecting split request at {} because it is not within the range.", - this, - splitPosition); - return false; - } - - range = range.withEndKey(splitPosition); - return true; - } - - @Override - public synchronized double getFractionConsumed() { - if (position == null) { - return 0; - } - return range.estimateFractionForKey(position); - } - - /////////////////////////////////////////////////////////////////////////////// - private ByteKeyRange range; - @Nullable private ByteKey position; - - private ByteKeyRangeTracker(ByteKeyRange range) { - this.range = range; - this.position = null; - } - - @Override - public String toString() { - return toStringHelper(ByteKeyRangeTracker.class) - .add("range", range) - .add("position", position) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java deleted file mode 100644 index 3cebb77..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/OffsetRangeTracker.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.range; - -import com.google.common.annotations.VisibleForTesting; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A {@link RangeTracker} for non-negative positions of type {@code long}. - */ -public class OffsetRangeTracker implements RangeTracker { - private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class); - - private final long startOffset; - private long stopOffset; - private long lastRecordStart = -1L; - private long offsetOfLastSplitPoint = -1L; - - /** - * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and - * indicates reading all of the records until the end without specifying exactly what the end is. - * - *

Infinite ranges cannot be split because it is impossible to estimate progress within them. - */ - public static final long OFFSET_INFINITY = Long.MAX_VALUE; - - /** - * Creates an {@code OffsetRangeTracker} for the specified range. - */ - public OffsetRangeTracker(long startOffset, long stopOffset) { - this.startOffset = startOffset; - this.stopOffset = stopOffset; - } - - @Override - public synchronized Long getStartPosition() { - return startOffset; - } - - @Override - public synchronized Long getStopPosition() { - return stopOffset; - } - - @Override - public boolean tryReturnRecordAt(boolean isAtSplitPoint, Long recordStart) { - return tryReturnRecordAt(isAtSplitPoint, recordStart.longValue()); - } - - public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) { - if (lastRecordStart == -1 && !isAtSplitPoint) { - throw new IllegalStateException( - String.format("The first record [starting at %d] must be at a split point", recordStart)); - } - if (recordStart < lastRecordStart) { - throw new IllegalStateException( - String.format( - "Trying to return record [starting at %d] " - + "which is before the last-returned record [starting at %d]", - recordStart, - lastRecordStart)); - } - if (isAtSplitPoint) { - if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) { - throw new IllegalStateException( - String.format( - "Record at a split point has same offset as the previous split point: " - + "previous split point at %d, current record starts at %d", - offsetOfLastSplitPoint, recordStart)); - } - if (recordStart >= stopOffset) { - return false; - } - offsetOfLastSplitPoint = recordStart; - } - - lastRecordStart = recordStart; - return true; - } - - @Override - public boolean trySplitAtPosition(Long splitOffset) { - return trySplitAtPosition(splitOffset.longValue()); - } - - public synchronized boolean trySplitAtPosition(long splitOffset) { - if (stopOffset == OFFSET_INFINITY) { - LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset); - return false; - } - if (lastRecordStart == -1) { - LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset); - return false; - } - - // Note: technically it is correct to split at any position after the last returned - // split point, not just the last returned record. - // TODO: Investigate whether in practice this is useful or, rather, confusing. - if (splitOffset <= lastRecordStart) { - LOG.debug( - "Refusing to split {} at {}: already past proposed split position", this, splitOffset); - return false; - } - if (splitOffset < startOffset || splitOffset >= stopOffset) { - LOG.debug( - "Refusing to split {} at {}: proposed split position out of range", this, splitOffset); - return false; - } - LOG.debug("Agreeing to split {} at {}", this, splitOffset); - this.stopOffset = splitOffset; - return true; - } - - /** - * Returns a position {@code P} such that the range {@code [start, P)} represents approximately - * the given fraction of the range {@code [start, end)}. Assumes that the density of records - * in the range is approximately uniform. - */ - public synchronized long getPositionForFractionConsumed(double fraction) { - if (stopOffset == OFFSET_INFINITY) { - throw new IllegalArgumentException( - "getPositionForFractionConsumed is not applicable to an unbounded range: " + this); - } - return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset)); - } - - @Override - public synchronized double getFractionConsumed() { - if (stopOffset == OFFSET_INFINITY) { - return 0.0; - } - if (lastRecordStart == -1) { - return 0.0; - } - // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5 - // which is (4 - 3 + 1) / (6 - 3) = 67%. - // Also, clamp to at most 1.0 because the last consumed position can extend past the - // stop position. - return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset)); - } - - @Override - public synchronized String toString() { - String stopString = (stopOffset == OFFSET_INFINITY) ? "infinity" : String.valueOf(stopOffset); - if (lastRecordStart >= 0) { - return String.format( - "", - lastRecordStart, - startOffset, - stopString); - } else { - return String.format("", startOffset, stopString); - } - } - - /** - * Returns a copy of this tracker for testing purposes (to simplify testing methods with - * side effects). - */ - @VisibleForTesting - OffsetRangeTracker copy() { - OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset); - res.lastRecordStart = this.lastRecordStart; - return res; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java deleted file mode 100644 index dad9edc..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/RangeTracker.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.io.range; - -/** - * A {@code RangeTracker} is a thread-safe helper object for implementing dynamic work rebalancing - * in position-based {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader} - * subclasses. - * - *

Usage of the RangeTracker class hierarchy

- * The abstract {@code RangeTracker} interface should not be used per se - all users should use its - * subclasses directly. We declare it here because all subclasses have roughly the same interface - * and the same properties, to centralize the documentation. Currently we provide one - * implementation - {@link OffsetRangeTracker}. - * - *

Position-based sources

- * A position-based source is one where the source can be described by a range of positions of - * an ordered type and the records returned by the reader can be described by positions of the - * same type. - * - *

In case a record occupies a range of positions in the source, the most important thing about - * the record is the position where it starts. - * - *

Defining the semantics of positions for a source is entirely up to the source class, however - * the chosen definitions have to obey certain properties in order to make it possible to correctly - * split the source into parts, including dynamic splitting. Two main aspects need to be defined: - *

    - *
  • How to assign starting positions to records. - *
  • Which records should be read by a source with a range {@code [A, B)}. - *
- * Moreover, reading a range must be efficient, i.e., the performance of reading a range - * should not significantly depend on the location of the range. For example, reading the range - * {@code [A, B)} should not require reading all data before {@code A}. - * - *

The sections below explain exactly what properties these definitions must satisfy, and - * how to use a {@code RangeTracker} with a properly defined source. - * - *

Properties of position-based sources

- * The main requirement for position-based sources is associativity: reading records from - * {@code [A, B)} and records from {@code [B, C)} should give the same records as reading from - * {@code [A, C)}, where {@code A <= B <= C}. This property ensures that no matter how a range - * of positions is split into arbitrarily many sub-ranges, the total set of records described by - * them stays the same. - * - *

The other important property is how the source's range relates to positions of records in - * the source. In many sources each record can be identified by a unique starting position. - * In this case: - *

    - *
  • All records returned by a source {@code [A, B)} must have starting positions - * in this range. - *
  • All but the last record should end within this range. The last record may or may not - * extend past the end of the range. - *
  • Records should not overlap. - *
- * Such sources should define "read {@code [A, B)}" as "read from the first record starting at or - * after A, up to but not including the first record starting at or after B". - * - *

Some examples of such sources include reading lines or CSV from a text file, reading keys and - * values from a BigTable, etc. - * - *

The concept of split points allows to extend the definitions for dealing with sources - * where some records cannot be identified by a unique starting position. - * - *

In all cases, all records returned by a source {@code [A, B)} must start at or after - * {@code A}. - * - *

Split points

- * - *

Some sources may have records that are not directly addressable. For example, imagine a file - * format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but - * records within the block cannot be directly addressed without decompressing the block. Let us - * refer to this hypothetical format as CBF (Compressed Blocks Format). - * - *

Many such formats can still satisfy the associativity property. For example, in CBF, reading - * {@code [A, B)} can mean "read all the records in all blocks whose starting offset is in - * {@code [A, B)}". - * - *

To support such complex formats, we introduce the notion of split points. We say that - * a record is a split point if there exists a position {@code A} such that the record is the first - * one to be returned when reading the range {@code [A, infinity)}. In CBF, the only split points - * would be the first records in each block. - * - *

Split points allow us to define the meaning of a record's position and a source's range - * in all cases: - *

    - *
  • For a record that is at a split point, its position is defined to be the largest - * {@code A} such that reading a source with the range {@code [A, infinity)} returns this record; - *
  • Positions of other records are only required to be non-decreasing; - *
  • Reading the source {@code [A, B)} must return records starting from the first split point - * at or after {@code A}, up to but not including the first split point at or after {@code B}. - * In particular, this means that the first record returned by a source MUST always be - * a split point. - *
  • Positions of split points must be unique. - *
- * As a result, for any decomposition of the full range of the source into position ranges, the - * total set of records will be the full set of records in the source, and each record - * will be read exactly once. - * - *

Consumed positions

- * As the source is being read, and records read from it are being passed to the downstream - * transforms in the pipeline, we say that positions in the source are being consumed. - * When a reader has read a record (or promised to a caller that a record will be returned), - * positions up to and including the record's start position are considered consumed. - * - *

Dynamic splitting can happen only at unconsumed positions. If the reader just - * returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or - * beyond, as otherwise that record could be read twice (by the current reader and by a reader - * of the task starting at 43). - * - *

Example

- * The following example uses an {@link OffsetRangeTracker} to support dynamically splitting - * a source with integer positions (offsets). - *
 {@code
- *   class MyReader implements BoundedReader {
- *     private MySource currentSource;
- *     private final OffsetRangeTracker tracker = new OffsetRangeTracker();
- *     ...
- *     MyReader(MySource source) {
- *       this.currentSource = source;
- *       this.tracker = new MyRangeTracker<>(source.getStartOffset(), source.getEndOffset())
- *     }
- *     ...
- *     boolean start() {
- *       ... (general logic for locating the first record) ...
- *       if (!tracker.tryReturnRecordAt(true, recordStartOffset)) return false;
- *       ... (any logic that depends on the record being returned, e.g. counting returned records)
- *       return true;
- *     }
- *     boolean advance() {
- *       ... (general logic for locating the next record) ...
- *       if (!tracker.tryReturnRecordAt(isAtSplitPoint, recordStartOffset)) return false;
- *       ... (any logic that depends on the record being returned, e.g. counting returned records)
- *       return true;
- *     }
- *
- *     double getFractionConsumed() {
- *       return tracker.getFractionConsumed();
- *     }
- *   }
- * } 
- * - *

Usage with different models of iteration

- * When using this class to protect a - * {@link com.google.cloud.dataflow.sdk.io.BoundedSource.BoundedReader}, follow the pattern - * described above. - * - *

When using this class to protect iteration in the {@code hasNext()/next()} - * model, consider the record consumed when {@code hasNext()} is about to return true, rather than - * when {@code next()} is called, because {@code hasNext()} returning true is promising the caller - * that {@code next()} will have an element to return - so {@link #trySplitAtPosition} must not - * split the range in a way that would make the record promised by {@code hasNext()} belong to - * a different range. - * - *

Also note that implementations of {@code hasNext()} need to ensure - * that they call {@link #tryReturnRecordAt} only once even if {@code hasNext()} is called - * repeatedly, due to the requirement on uniqueness of split point positions. - * - * @param Type of positions used by the source to define ranges and identify records. - */ -public interface RangeTracker { - /** - * Returns the starting position of the current range, inclusive. - */ - PositionT getStartPosition(); - - /** - * Returns the ending position of the current range, exclusive. - */ - PositionT getStopPosition(); - - /** - * Atomically determines whether a record at the given position can be returned and updates - * internal state. In particular: - *

    - *
  • If {@code isAtSplitPoint} is {@code true}, and {@code recordStart} is outside the current - * range, returns {@code false}; - *
  • Otherwise, updates the last-consumed position to {@code recordStart} and returns - * {@code true}. - *
- *

This method MUST be called on all split point records. It may be called on every record. - */ - boolean tryReturnRecordAt(boolean isAtSplitPoint, PositionT recordStart); - - /** - * Atomically splits the current range [{@link #getStartPosition}, {@link #getStopPosition}) - * into a "primary" part [{@link #getStartPosition}, {@code splitPosition}) - * and a "residual" part [{@code splitPosition}, {@link #getStopPosition}), assuming the current - * last-consumed position is within [{@link #getStartPosition}, splitPosition) - * (i.e., {@code splitPosition} has not been consumed yet). - * - *

Updates the current range to be the primary and returns {@code true}. This means that - * all further calls on the current object will interpret their arguments relative to the - * primary range. - * - *

If the split position has already been consumed, or if no {@link #tryReturnRecordAt} call - * was made yet, returns {@code false}. The second condition is to prevent dynamic splitting - * during reader start-up. - */ - boolean trySplitAtPosition(PositionT splitPosition); - - /** - * Returns the approximate fraction of positions in the source that have been consumed by - * successful {@link #tryReturnRecordAt} calls, or 0.0 if no such calls have happened. - */ - double getFractionConsumed(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java deleted file mode 100644 index 9117e43..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/range/package-info.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Provides thread-safe helpers for implementing dynamic work rebalancing in position-based - * bounded sources. - * - *

See {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} to get started. - */ -package com.google.cloud.dataflow.sdk.io.range; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java deleted file mode 100644 index 2860fbe..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/ApplicationNameOptions.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -/** - * Options that allow setting the application name. - */ -public interface ApplicationNameOptions extends PipelineOptions { - /** - * Name of application, for display purposes. - * - *

Defaults to the name of the class that constructs the {@link PipelineOptions} - * via the {@link PipelineOptionsFactory}. - */ - @Description("Name of application for display purposes. Defaults to the name of the class that " - + "constructs the PipelineOptions via the PipelineOptionsFactory.") - String getAppName(); - void setAppName(String value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java deleted file mode 100644 index 5576b5f..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/BigQueryOptions.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -/** - * Properties needed when using BigQuery with the Dataflow SDK. - */ -@Description("Options that are used to configure BigQuery. See " - + "https://cloud.google.com/bigquery/what-is-bigquery for details on BigQuery.") -public interface BigQueryOptions extends ApplicationNameOptions, GcpOptions, - PipelineOptions, StreamingOptions { - @Description("Temporary dataset for BigQuery table operations. " - + "Supported values are \"bigquery.googleapis.com/{dataset}\"") - @Default.String("bigquery.googleapis.com/cloud_dataflow") - String getTempDatasetId(); - void setTempDatasetId(String value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java deleted file mode 100644 index 843eba1..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Default.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * {@link Default} represents a set of annotations that can be used to annotate getter properties - * on {@link PipelineOptions} with information representing the default value to be returned - * if no value is specified. - */ -public @interface Default { - /** - * This represents that the default of the option is the specified {@link java.lang.Class} value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Class { - java.lang.Class value(); - } - - /** - * This represents that the default of the option is the specified {@link java.lang.String} - * value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface String { - java.lang.String value(); - } - - /** - * This represents that the default of the option is the specified boolean primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Boolean { - boolean value(); - } - - /** - * This represents that the default of the option is the specified char primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Character { - char value(); - } - - /** - * This represents that the default of the option is the specified byte primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Byte { - byte value(); - } - /** - * This represents that the default of the option is the specified short primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Short { - short value(); - } - /** - * This represents that the default of the option is the specified int primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Integer { - int value(); - } - - /** - * This represents that the default of the option is the specified long primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Long { - long value(); - } - - /** - * This represents that the default of the option is the specified float primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Float { - float value(); - } - - /** - * This represents that the default of the option is the specified double primitive value. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Double { - double value(); - } - - /** - * This represents that the default of the option is the specified enum. - * The value should equal the enum's {@link java.lang.Enum#name() name}. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface Enum { - java.lang.String value(); - } - - /** - * Value must be of type {@link DefaultValueFactory} and have a default constructor. - * Value is instantiated and then used as a factory to generate the default. - * - *

See {@link DefaultValueFactory} for more details. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - @Documented - public @interface InstanceFactory { - java.lang.Class> value(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java deleted file mode 100644 index a6e7856..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DefaultValueFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -/** - * An interface used with the {@link Default.InstanceFactory} annotation to specify the class that - * will be an instance factory to produce default values for a given getter on - * {@link PipelineOptions}. When a property on a {@link PipelineOptions} is fetched, and is - * currently unset, the default value factory will be instantiated and invoked. - * - *

Care must be taken to not produce an infinite loop when accessing other fields on the - * {@link PipelineOptions} object. - * - * @param The type of object this factory produces. - */ -public interface DefaultValueFactory { - /** - * Creates a default value for a getter marked with {@link Default.InstanceFactory}. - * - * @param options The current pipeline options. - * @return The default value to be used for the annotated getter. - */ - T create(PipelineOptions options); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java deleted file mode 100644 index 4f90c2a..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Description.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Descriptions are used to generate human readable output when the {@code --help} - * command is specified. Description annotations placed on interfaces that extend - * {@link PipelineOptions} will describe groups of related options. Description annotations - * placed on getter methods will be used to provide human readable information - * for the specific option. - */ -@Target({ElementType.METHOD, ElementType.TYPE}) -@Retention(RetentionPolicy.RUNTIME) -public @interface Description { - String value(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java deleted file mode 100644 index a505632..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DirectPipelineOptions.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.runners.DirectPipeline; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Options that can be used to configure the {@link DirectPipeline}. - */ -public interface DirectPipelineOptions extends - ApplicationNameOptions, BigQueryOptions, GcsOptions, GcpOptions, - PipelineOptions, StreamingOptions { - - /** - * The random seed to use for pseudorandom behaviors in the {@link DirectPipelineRunner}. - * If not explicitly specified, a random seed will be generated. - */ - @JsonIgnore - @Description("The random seed to use for pseudorandom behaviors in the DirectPipelineRunner." - + " If not explicitly specified, a random seed will be generated.") - Long getDirectPipelineRunnerRandomSeed(); - void setDirectPipelineRunnerRandomSeed(Long value); - - /** - * Controls whether the runner should ensure that all of the elements of - * the pipeline, such as DoFns, can be serialized. - */ - @JsonIgnore - @Description("Controls whether the runner should ensure that all of the elements of the " - + "pipeline, such as DoFns, can be serialized.") - @Default.Boolean(true) - boolean isTestSerializability(); - void setTestSerializability(boolean testSerializability); - - /** - * Controls whether the runner should ensure that all of the elements of - * every {@link PCollection} can be encoded using the appropriate - * {@link Coder}. - */ - @JsonIgnore - @Description("Controls whether the runner should ensure that all of the elements of every " - + "PCollection can be encoded using the appropriate Coder.") - @Default.Boolean(true) - boolean isTestEncodability(); - void setTestEncodability(boolean testEncodability); - - /** - * Controls whether the runner should randomize the order of each - * {@link PCollection}. - */ - @JsonIgnore - @Description("Controls whether the runner should randomize the order of each PCollection.") - @Default.Boolean(true) - boolean isTestUnorderedness(); - void setTestUnorderedness(boolean testUnorderedness); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java deleted file mode 100644 index 52028cd..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants; -import com.google.cloud.dataflow.sdk.util.CredentialFactory; -import com.google.cloud.dataflow.sdk.util.GcpCredentialFactory; -import com.google.cloud.dataflow.sdk.util.InstanceBuilder; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.Files; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.GeneralSecurityException; -import java.util.Locale; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Options used to configure Google Cloud Platform project and credentials. - * - *

These options configure which of the following three different mechanisms for obtaining a - * credential are used: - *

    - *
  1. - * It can fetch the - * - * application default credentials. - *
  2. - *
  3. - * The user can specify a client secrets file and go through the OAuth2 - * webflow. The credential will then be cached in the user's home - * directory for reuse. - *
  4. - *
  5. - * The user can specify a file containing a service account private key along - * with the service account name. - *
  6. - *
- * - *

The default mechanism is to use the - * - * application default credentials. The other options can be - * used by setting the corresponding properties. - */ -@Description("Options used to configure Google Cloud Platform project and credentials.") -public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { - /** - * Project id to use when launching jobs. - */ - @Description("Project id. Required when running a Dataflow in the cloud. " - + "See https://cloud.google.com/storage/docs/projects for further details.") - @Default.InstanceFactory(DefaultProjectFactory.class) - String getProject(); - void setProject(String value); - - /** - * This option controls which file to use when attempting to create the credentials using the - * service account method. - * - *

This option if specified, needs be combined with the - * {@link GcpOptions#getServiceAccountName() serviceAccountName}. - */ - @JsonIgnore - @Description("Controls which file to use when attempting to create the credentials " - + "using the service account method. This option if specified, needs to be combined with " - + "the serviceAccountName option.") - String getServiceAccountKeyfile(); - void setServiceAccountKeyfile(String value); - - /** - * This option controls which service account to use when attempting to create the credentials - * using the service account method. - * - *

This option if specified, needs be combined with the - * {@link GcpOptions#getServiceAccountKeyfile() serviceAccountKeyfile}. - */ - @JsonIgnore - @Description("Controls which service account to use when attempting to create the credentials " - + "using the service account method. This option if specified, needs to be combined with " - + "the serviceAccountKeyfile option.") - String getServiceAccountName(); - void setServiceAccountName(String value); - - /** - * This option controls which file to use when attempting to create the credentials - * using the OAuth 2 webflow. After the OAuth2 webflow, the credentials will be stored - * within credentialDir. - */ - @JsonIgnore - @Description("This option controls which file to use when attempting to create the credentials " - + "using the OAuth 2 webflow. After the OAuth2 webflow, the credentials will be stored " - + "within credentialDir.") - String getSecretsFile(); - void setSecretsFile(String value); - - /** - * This option controls which credential store to use when creating the credentials - * using the OAuth 2 webflow. - */ - @Description("This option controls which credential store to use when creating the credentials " - + "using the OAuth 2 webflow.") - @Default.String("cloud_dataflow") - String getCredentialId(); - void setCredentialId(String value); - - /** - * Directory for storing dataflow credentials after execution of the OAuth 2 webflow. Defaults - * to using the $HOME/.store/data-flow directory. - */ - @Description("Directory for storing dataflow credentials after execution of the OAuth 2 webflow. " - + "Defaults to using the $HOME/.store/data-flow directory.") - @Default.InstanceFactory(CredentialDirFactory.class) - String getCredentialDir(); - void setCredentialDir(String value); - - /** - * Returns the default credential directory of ${user.home}/.store/data-flow. - */ - public static class CredentialDirFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - File home = new File(System.getProperty("user.home")); - File store = new File(home, ".store"); - File dataflow = new File(store, "data-flow"); - return dataflow.getPath(); - } - } - - /** - * The class of the credential factory that should be created and used to create - * credentials. If gcpCredential has not been set explicitly, an instance of this class will - * be constructed and used as a credential factory. - */ - @Description("The class of the credential factory that should be created and used to create " - + "credentials. If gcpCredential has not been set explicitly, an instance of this class will " - + "be constructed and used as a credential factory.") - @Default.Class(GcpCredentialFactory.class) - Class getCredentialFactoryClass(); - void setCredentialFactoryClass( - Class credentialFactoryClass); - - /** - * The credential instance that should be used to authenticate against GCP services. - * If no credential has been set explicitly, the default is to use the instance factory - * that constructs a credential based upon the currently set credentialFactoryClass. - */ - @JsonIgnore - @Description("The credential instance that should be used to authenticate against GCP services. " - + "If no credential has been set explicitly, the default is to use the instance factory " - + "that constructs a credential based upon the currently set credentialFactoryClass.") - @Default.InstanceFactory(GcpUserCredentialsFactory.class) - @Hidden - Credential getGcpCredential(); - void setGcpCredential(Credential value); - - /** - * Attempts to infer the default project based upon the environment this application - * is executing within. Currently this only supports getting the default project from gcloud. - */ - public static class DefaultProjectFactory implements DefaultValueFactory { - private static final Logger LOG = LoggerFactory.getLogger(DefaultProjectFactory.class); - - @Override - public String create(PipelineOptions options) { - try { - File configFile; - if (getEnvironment().containsKey("CLOUDSDK_CONFIG")) { - configFile = new File(getEnvironment().get("CLOUDSDK_CONFIG"), "properties"); - } else if (isWindows() && getEnvironment().containsKey("APPDATA")) { - configFile = new File(getEnvironment().get("APPDATA"), "gcloud/properties"); - } else { - // New versions of gcloud use this file - configFile = new File( - System.getProperty("user.home"), - ".config/gcloud/configurations/config_default"); - if (!configFile.exists()) { - // Old versions of gcloud use this file - configFile = new File(System.getProperty("user.home"), ".config/gcloud/properties"); - } - } - String section = null; - Pattern projectPattern = Pattern.compile("^project\\s*=\\s*(.*)$"); - Pattern sectionPattern = Pattern.compile("^\\[(.*)\\]$"); - for (String line : Files.readLines(configFile, StandardCharsets.UTF_8)) { - line = line.trim(); - if (line.isEmpty() || line.startsWith(";")) { - continue; - } - Matcher matcher = sectionPattern.matcher(line); - if (matcher.matches()) { - section = matcher.group(1); - } else if (section == null || section.equals("core")) { - matcher = projectPattern.matcher(line); - if (matcher.matches()) { - String project = matcher.group(1).trim(); - LOG.info("Inferred default GCP project '{}' from gcloud. If this is the incorrect " - + "project, please cancel this Pipeline and specify the command-line " - + "argument --project.", project); - return project; - } - } - } - } catch (IOException expected) { - LOG.debug("Failed to find default project.", expected); - } - // return null if can't determine - return null; - } - - /** - * Returns true if running on the Windows OS. - */ - private static boolean isWindows() { - return System.getProperty("os.name").toLowerCase(Locale.ENGLISH).contains("windows"); - } - - /** - * Used to mock out getting environment variables. - */ - @VisibleForTesting - Map getEnvironment() { - return System.getenv(); - } - } - - /** - * Attempts to load the GCP credentials. See - * {@link CredentialFactory#getCredential()} for more details. - */ - public static class GcpUserCredentialsFactory implements DefaultValueFactory { - @Override - public Credential create(PipelineOptions options) { - GcpOptions gcpOptions = options.as(GcpOptions.class); - try { - CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) - .fromClass(gcpOptions.getCredentialFactoryClass()) - .fromFactoryMethod("fromOptions") - .withArg(PipelineOptions.class, options) - .build(); - return factory.getCredential(); - } catch (IOException | GeneralSecurityException e) { - throw new RuntimeException("Unable to obtain credential", e); - } - } - } - - /** - * The token server URL to use for OAuth 2 authentication. Normally, the default is sufficient, - * but some specialized use cases may want to override this value. - */ - @Description("The token server URL to use for OAuth 2 authentication. Normally, the default " - + "is sufficient, but some specialized use cases may want to override this value.") - @Default.String(GoogleOAuthConstants.TOKEN_SERVER_URL) - @Hidden - String getTokenServerUrl(); - void setTokenServerUrl(String value); - - /** - * The authorization server URL to use for OAuth 2 authentication. Normally, the default is - * sufficient, but some specialized use cases may want to override this value. - */ - @Description("The authorization server URL to use for OAuth 2 authentication. Normally, the " - + "default is sufficient, but some specialized use cases may want to override this value.") - @Default.String(GoogleOAuthConstants.AUTHORIZATION_SERVER_URL) - @Hidden - String getAuthorizationServerEncodedUrl(); - void setAuthorizationServerEncodedUrl(String value); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java deleted file mode 100644 index 130310a..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GcsOptions.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import com.google.cloud.dataflow.sdk.util.AppEngineEnvironment; -import com.google.cloud.dataflow.sdk.util.GcsUtil; -import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Options used to configure Google Cloud Storage. - */ -public interface GcsOptions extends - ApplicationNameOptions, GcpOptions, PipelineOptions { - /** - * The GcsUtil instance that should be used to communicate with Google Cloud Storage. - */ - @JsonIgnore - @Description("The GcsUtil instance that should be used to communicate with Google Cloud Storage.") - @Default.InstanceFactory(GcsUtil.GcsUtilFactory.class) - @Hidden - GcsUtil getGcsUtil(); - void setGcsUtil(GcsUtil value); - - /** - * The ExecutorService instance to use to create threads, can be overridden to specify an - * ExecutorService that is compatible with the users environment. If unset, the - * default is to create an ExecutorService with an unbounded number of threads; this - * is compatible with Google AppEngine. - */ - @JsonIgnore - @Description("The ExecutorService instance to use to create multiple threads. Can be overridden " - + "to specify an ExecutorService that is compatible with the users environment. If unset, " - + "the default is to create an ExecutorService with an unbounded number of threads; this " - + "is compatible with Google AppEngine.") - @Default.InstanceFactory(ExecutorServiceFactory.class) - @Hidden - ExecutorService getExecutorService(); - void setExecutorService(ExecutorService value); - - /** - * GCS endpoint to use. If unspecified, uses the default endpoint. - */ - @JsonIgnore - @Hidden - @Description("The URL for the GCS API.") - String getGcsEndpoint(); - void setGcsEndpoint(String value); - - /** - * The buffer size (in bytes) to use when uploading files to GCS. Please see the documentation for - * {@link AbstractGoogleAsyncWriteChannel#setUploadBufferSize} for more information on the - * restrictions and performance implications of this value. - */ - @Description("The buffer size (in bytes) to use when uploading files to GCS. Please see the " - + "documentation for AbstractGoogleAsyncWriteChannel.setUploadBufferSize for more " - + "information on the restrictions and performance implications of this value.\n\n" - + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" - + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") - Integer getGcsUploadBufferSizeBytes(); - void setGcsUploadBufferSizeBytes(Integer bytes); - - /** - * Returns the default {@link ExecutorService} to use within the Dataflow SDK. The - * {@link ExecutorService} is compatible with AppEngine. - */ - public static class ExecutorServiceFactory implements DefaultValueFactory { - @SuppressWarnings("deprecation") // IS_APP_ENGINE is deprecated for internal use only. - @Override - public ExecutorService create(PipelineOptions options) { - ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder(); - threadFactoryBuilder.setThreadFactory(MoreExecutors.platformThreadFactory()); - if (!AppEngineEnvironment.IS_APP_ENGINE) { - // AppEngine doesn't allow modification of threads to be daemon threads. - threadFactoryBuilder.setDaemon(true); - } - /* The SDK requires an unbounded thread pool because a step may create X writers - * each requiring their own thread to perform the writes otherwise a writer may - * block causing deadlock for the step because the writers buffer is full. - * Also, the MapTaskExecutor launches the steps in reverse order and completes - * them in forward order thus requiring enough threads so that each step's writers - * can be active. - */ - return new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, // Allow an unlimited number of re-usable threads. - Long.MAX_VALUE, TimeUnit.NANOSECONDS, // Keep non-core threads alive forever. - new SynchronousQueue(), - threadFactoryBuilder.build()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java deleted file mode 100644 index 1863141..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptions.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import com.google.api.client.googleapis.services.AbstractGoogleClient; -import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; -import com.google.api.client.googleapis.services.GoogleClientRequestInitializer; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -/** - * These options configure debug settings for Google API clients created within the Dataflow SDK. - */ -public interface GoogleApiDebugOptions extends PipelineOptions { - /** - * This option enables tracing of API calls to Google services used within the - * Dataflow SDK. Values are expected in JSON format {"ApiName":"TraceDestination",...} - * where the {@code ApiName} represents the request classes canonical name. The - * {@code TraceDestination} is a logical trace consumer to whom the trace will be reported. - * Typically, "producer" is the right destination to use: this makes API traces available to the - * team offering the API. Note that by enabling this option, the contents of the requests to and - * from Google Cloud services will be made available to Google. For example, by specifying - * {"Dataflow":"producer"}, all calls to the Dataflow service will be made available - * to Google, specifically to the Google Cloud Dataflow team. - */ - @Description("This option enables tracing of API calls to Google services used within the " - + "Dataflow SDK. Values are expected in JSON format {\"ApiName\":\"TraceDestination\",...} " - + "where the ApiName represents the request classes canonical name. The TraceDestination is " - + "a logical trace consumer to whom the trace will be reported. Typically, \"producer\" is " - + "the right destination to use: this makes API traces available to the team offering the " - + "API. Note that by enabling this option, the contents of the requests to and from " - + "Google Cloud services will be made available to Google. For example, by specifying " - + "{\"Dataflow\":\"producer\"}, all calls to the Dataflow service will be made available to " - + "Google, specifically to the Google Cloud Dataflow team.") - GoogleApiTracer getGoogleApiTrace(); - void setGoogleApiTrace(GoogleApiTracer commands); - - /** - * A {@link GoogleClientRequestInitializer} that adds the trace destination to Google API calls. - */ - public static class GoogleApiTracer extends HashMap - implements GoogleClientRequestInitializer { - /** - * Creates a {@link GoogleApiTracer} that sets the trace destination on all - * calls that match the given client type. - */ - public GoogleApiTracer addTraceFor(AbstractGoogleClient client, String traceDestination) { - put(client.getClass().getCanonicalName(), traceDestination); - return this; - } - - /** - * Creates a {@link GoogleApiTracer} that sets the trace {@code traceDestination} on all - * calls that match for the given request type. - */ - public GoogleApiTracer addTraceFor( - AbstractGoogleClientRequest request, String traceDestination) { - put(request.getClass().getCanonicalName(), traceDestination); - return this; - } - - @Override - public void initialize(AbstractGoogleClientRequest request) throws IOException { - for (Map.Entry entry : this.entrySet()) { - if (request.getClass().getCanonicalName().contains(entry.getKey())) { - request.set("$trace", entry.getValue()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java deleted file mode 100644 index fd25db8..0000000 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/Hidden.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.options; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Methods and/or interfaces annotated with {@code @Hidden} will be suppressed from - * being output when {@code --help} is specified on the command-line. - */ -@Target({ElementType.METHOD, ElementType.TYPE}) -@Retention(RetentionPolicy.RUNTIME) -@Documented -public @interface Hidden { -}