Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56DFC195D9 for ; Thu, 24 Mar 2016 02:47:33 +0000 (UTC) Received: (qmail 4243 invoked by uid 500); 24 Mar 2016 02:47:33 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 4190 invoked by uid 500); 24 Mar 2016 02:47:33 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 4181 invoked by uid 99); 24 Mar 2016 02:47:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:47:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 69DFDC1EDB for ; Thu, 24 Mar 2016 02:47:32 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.23 X-Spam-Level: X-Spam-Status: No, score=-3.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id sgYY1aRy8U51 for ; Thu, 24 Mar 2016 02:47:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 7F4245FAEA for ; Thu, 24 Mar 2016 02:47:26 +0000 (UTC) Received: (qmail 3893 invoked by uid 99); 24 Mar 2016 02:47:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 02:47:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8AFE3E9810; Thu, 24 Mar 2016 02:47:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Thu, 24 Mar 2016 02:47:45 -0000 Message-Id: <357fe478b6ff46449901afadc8b5e237@git.apache.org> In-Reply-To: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> References: <0c1667d8252646c1acedf8c54c1ba4ca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/67] [partial] incubator-beam git commit: Directory reorganization http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java deleted file mode 100644 index 8997050..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/MapElements.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -/** - * {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}. - */ -public class MapElements -extends PTransform, PCollection> { - - /** - * For a {@code SerializableFunction} {@code fn} and output type descriptor, - * returns a {@code PTransform} that takes an input {@code PCollection} and returns - * a {@code PCollection} containing {@code fn.apply(v)} for every element {@code v} in - * the input. - * - *

Example of use in Java 8: - *

{@code
-   * PCollection wordLengths = words.apply(
-   *     MapElements.via((String word) -> word.length())
-   *         .withOutputType(new TypeDescriptor() {});
-   * }
- * - *

In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. - */ - public static MissingOutputTypeDescriptor - via(SerializableFunction fn) { - return new MissingOutputTypeDescriptor<>(fn); - } - - /** - * For a {@code SimpleFunction} {@code fn}, returns a {@code PTransform} that - * takes an input {@code PCollection} and returns a {@code PCollection} - * containing {@code fn.apply(v)} for every element {@code v} in the input. - * - *

This overload is intended primarily for use in Java 7. In Java 8, the overload - * {@link #via(SerializableFunction)} supports use of lambda for greater concision. - * - *

Example of use in Java 7: - *

{@code
-   * PCollection words = ...;
-   * PCollection wordsPerLine = words.apply(MapElements.via(
-   *     new SimpleFunction() {
-   *       public Integer apply(String word) {
-   *         return word.length();
-   *       }
-   *     }));
-   * }
- */ - public static MapElements - via(final SimpleFunction fn) { - return new MapElements<>(fn, fn.getOutputTypeDescriptor()); - } - - /** - * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide - * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See - * {@link #via(SerializableFunction)} for a full example of use. - */ - public static final class MissingOutputTypeDescriptor { - - private final SerializableFunction fn; - - private MissingOutputTypeDescriptor(SerializableFunction fn) { - this.fn = fn; - } - - public MapElements withOutputType(TypeDescriptor outputType) { - return new MapElements<>(fn, outputType); - } - } - - /////////////////////////////////////////////////////////////////// - - private final SerializableFunction fn; - private final transient TypeDescriptor outputType; - - private MapElements( - SerializableFunction fn, - TypeDescriptor outputType) { - this.fn = fn; - this.outputType = outputType; - } - - @Override - public PCollection apply(PCollection input) { - return input.apply(ParDo.named("Map").of(new DoFn() { - @Override - public void processElement(ProcessContext c) { - c.output(fn.apply(c.element())); - } - })).setTypeDescriptorInternal(outputType); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java deleted file mode 100644 index 8678e4f..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Max.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn; -import com.google.cloud.dataflow.sdk.util.common.Counter; -import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind; -import com.google.cloud.dataflow.sdk.util.common.CounterProvider; - -import java.io.Serializable; -import java.util.Comparator; - -/** - * {@code PTransform}s for computing the maximum of the elements in a {@code PCollection}, or the - * maximum of the values associated with each key in a {@code PCollection} of {@code KV}s. - * - *

Example 1: get the maximum of a {@code PCollection} of {@code Double}s. - *

 {@code
- * PCollection input = ...;
- * PCollection max = input.apply(Max.doublesGlobally());
- * } 
- * - *

Example 2: calculate the maximum of the {@code Integer}s - * associated with each unique key (which is of type {@code String}). - *

 {@code
- * PCollection> input = ...;
- * PCollection> maxPerKey = input
- *     .apply(Max.integersPerKey());
- * } 
- */ -public class Max { - - private Max() { - // do not instantiate - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a - * {@code PCollection} whose contents is the maximum of the input {@code PCollection}'s - * elements, or {@code Integer.MIN_VALUE} if there are no elements. - */ - public static Combine.Globally integersGlobally() { - return Combine.globally(new MaxIntegerFn()).named("Max.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and - * returns a {@code PCollection>} that contains an output element mapping each - * distinct key in the input {@code PCollection} to the maximum of the values associated with that - * key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static Combine.PerKey integersPerKey() { - return Combine.perKey(new MaxIntegerFn()).named("Max.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a {@code - * PCollection} whose contents is the maximum of the input {@code PCollection}'s elements, - * or {@code Long.MIN_VALUE} if there are no elements. - */ - public static Combine.Globally longsGlobally() { - return Combine.globally(new MaxLongFn()).named("Max.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns a - * {@code PCollection>} that contains an output element mapping each distinct key in - * the input {@code PCollection} to the maximum of the values associated with that key in the - * input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static Combine.PerKey longsPerKey() { - return Combine.perKey(new MaxLongFn()).named("Max.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a - * {@code PCollection} whose contents is the maximum of the input {@code PCollection}'s - * elements, or {@code Double.NEGATIVE_INFINITY} if there are no elements. - */ - public static Combine.Globally doublesGlobally() { - return Combine.globally(new MaxDoubleFn()).named("Max.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns - * a {@code PCollection>} that contains an output element mapping each distinct key - * in the input {@code PCollection} to the maximum of the values associated with that key in the - * input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static Combine.PerKey doublesPerKey() { - return Combine.perKey(new MaxDoubleFn()).named("Max.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a {@code - * PCollection} whose contents is the maximum according to the natural ordering of {@code T} - * of the input {@code PCollection}'s elements, or {@code null} if there are no elements. - */ - public static > - Combine.Globally globally() { - return Combine.globally(MaxFn.naturalOrder()).named("Max.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns a - * {@code PCollection>} that contains an output element mapping each distinct key in the - * input {@code PCollection} to the maximum according to the natural ordering of {@code T} of the - * values associated with that key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static > - Combine.PerKey perKey() { - return Combine.perKey(MaxFn.naturalOrder()).named("Max.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a {@code - * PCollection} whose contents is the maximum of the input {@code PCollection}'s elements, or - * {@code null} if there are no elements. - */ - public static & Serializable> - Combine.Globally globally(ComparatorT comparator) { - return Combine.globally(MaxFn.of(comparator)).named("Max.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns a - * {@code PCollection>} that contains one output element per key mapping each - * to the maximum of the values associated with that key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static & Serializable> - Combine.PerKey perKey(ComparatorT comparator) { - return Combine.perKey(MaxFn.of(comparator)).named("Max.PerKey"); - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T} - * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or - * {@link Combine#perKey}. - * - * @param the type of the values being compared - */ - public static class MaxFn extends BinaryCombineFn { - - private final T identity; - private final Comparator comparator; - - private & Serializable> MaxFn( - T identity, ComparatorT comparator) { - this.identity = identity; - this.comparator = comparator; - } - - public static & Serializable> - MaxFn of(T identity, ComparatorT comparator) { - return new MaxFn(identity, comparator); - } - - public static & Serializable> - MaxFn of(ComparatorT comparator) { - return new MaxFn(null, comparator); - } - - public static > MaxFn naturalOrder(T identity) { - return new MaxFn(identity, new Top.Largest()); - } - - public static > MaxFn naturalOrder() { - return new MaxFn(null, new Top.Largest()); - } - - @Override - public T identity() { - return identity; - } - - @Override - public T apply(T left, T right) { - return comparator.compare(left, right) >= 0 ? left : right; - } - } - - /** - * A {@code CombineFn} that computes the maximum of a collection of {@code Integer}s, useful as an - * argument to {@link Combine#globally} or {@link Combine#perKey}. - */ - public static class MaxIntegerFn extends MaxFn implements - CounterProvider { - public MaxIntegerFn() { - super(Integer.MIN_VALUE, new Top.Largest()); - } - - @Override - public Counter getCounter(String name) { - return Counter.ints(name, AggregationKind.MAX); - } - } - - /** - * A {@code CombineFn} that computes the maximum of a collection of {@code Long}s, useful as an - * argument to {@link Combine#globally} or {@link Combine#perKey}. - */ - public static class MaxLongFn extends MaxFn implements - CounterProvider { - public MaxLongFn() { - super(Long.MIN_VALUE, new Top.Largest()); - } - - @Override - public Counter getCounter(String name) { - return Counter.longs(name, AggregationKind.MAX); - } - } - - /** - * A {@code CombineFn} that computes the maximum of a collection of {@code Double}s, useful as an - * argument to {@link Combine#globally} or {@link Combine#perKey}. - */ - public static class MaxDoubleFn extends MaxFn implements - CounterProvider { - public MaxDoubleFn() { - super(Double.NEGATIVE_INFINITY, new Top.Largest()); - } - - @Override - public Counter getCounter(String name) { - return Counter.doubles(name, AggregationKind.MAX); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java deleted file mode 100644 index 7dccfb6..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Mean.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.coders.AtomicCoder; -import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.CoderException; -import com.google.cloud.dataflow.sdk.coders.CoderRegistry; -import com.google.cloud.dataflow.sdk.coders.DoubleCoder; -import com.google.cloud.dataflow.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator; -import com.google.common.base.MoreObjects; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Objects; - -/** - * {@code PTransform}s for computing the arithmetic mean - * (a.k.a. average) of the elements in a {@code PCollection}, or the - * mean of the values associated with each key in a - * {@code PCollection} of {@code KV}s. - * - *

Example 1: get the mean of a {@code PCollection} of {@code Long}s. - *

 {@code
- * PCollection input = ...;
- * PCollection mean = input.apply(Mean.globally());
- * } 
- * - *

Example 2: calculate the mean of the {@code Integer}s - * associated with each unique key (which is of type {@code String}). - *

 {@code
- * PCollection> input = ...;
- * PCollection> meanPerKey =
- *     input.apply(Mean.perKey());
- * } 
- */ -public class Mean { - - private Mean() { } // Namespace only - - /** - * Returns a {@code PTransform} that takes an input - * {@code PCollection} and returns a - * {@code PCollection} whose contents is the mean of the - * input {@code PCollection}'s elements, or - * {@code 0} if there are no elements. - * - * @param the type of the {@code Number}s being combined - */ - public static Combine.Globally globally() { - return Combine.globally(new MeanFn<>()).named("Mean.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input - * {@code PCollection>} and returns a - * {@code PCollection>} that contains an output - * element mapping each distinct key in the input - * {@code PCollection} to the mean of the values associated with - * that key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and bucketing. - * - * @param the type of the keys - * @param the type of the {@code Number}s being combined - */ - public static Combine.PerKey perKey() { - return Combine.perKey(new MeanFn<>()).named("Mean.PerKey"); - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * A {@code Combine.CombineFn} that computes the arithmetic mean - * (a.k.a. average) of an {@code Iterable} of numbers of type - * {@code N}, useful as an argument to {@link Combine#globally} or - * {@link Combine#perKey}. - * - *

Returns {@code Double.NaN} if combining zero elements. - * - * @param the type of the {@code Number}s being combined - */ - static class MeanFn - extends Combine.AccumulatingCombineFn, Double> { - /** - * Constructs a combining function that computes the mean over - * a collection of values of type {@code N}. - */ - public MeanFn() {} - - @Override - public CountSum createAccumulator() { - return new CountSum<>(); - } - - @Override - public Coder> getAccumulatorCoder( - CoderRegistry registry, Coder inputCoder) { - return new CountSumCoder<>(); - } - } - - /** - * Accumulator class for {@link MeanFn}. - */ - static class CountSum - implements Accumulator, Double> { - - long count = 0; - double sum = 0.0; - - public CountSum() { - this(0, 0); - } - - public CountSum(long count, double sum) { - this.count = count; - this.sum = sum; - } - - @Override - public void addInput(NumT element) { - count++; - sum += element.doubleValue(); - } - - @Override - public void mergeAccumulator(CountSum accumulator) { - count += accumulator.count; - sum += accumulator.sum; - } - - @Override - public Double extractOutput() { - return count == 0 ? Double.NaN : sum / count; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof CountSum)) { - return false; - } - @SuppressWarnings("unchecked") - CountSum otherCountSum = (CountSum) other; - return (count == otherCountSum.count) - && (sum == otherCountSum.sum); - } - - @Override - public int hashCode() { - return Objects.hash(count, sum); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("count", count) - .add("sum", sum) - .toString(); - } - } - - static class CountSumCoder - extends AtomicCoder> { - private static final Coder LONG_CODER = BigEndianLongCoder.of(); - private static final Coder DOUBLE_CODER = DoubleCoder.of(); - - @Override - public void encode(CountSum value, OutputStream outStream, Coder.Context context) - throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - LONG_CODER.encode(value.count, outStream, nestedContext); - DOUBLE_CODER.encode(value.sum, outStream, nestedContext); - } - - @Override - public CountSum decode(InputStream inStream, Coder.Context context) - throws CoderException, IOException { - Coder.Context nestedContext = context.nested(); - return new CountSum<>( - LONG_CODER.decode(inStream, nestedContext), - DOUBLE_CODER.decode(inStream, nestedContext)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java deleted file mode 100644 index 47ab3a0..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Min.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn; -import com.google.cloud.dataflow.sdk.util.common.Counter; -import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind; -import com.google.cloud.dataflow.sdk.util.common.CounterProvider; - -import java.io.Serializable; -import java.util.Comparator; - -/** - * {@code PTransform}s for computing the minimum of the elements in a {@code PCollection}, or the - * minimum of the values associated with each key in a {@code PCollection} of {@code KV}s. - * - *

Example 1: get the minimum of a {@code PCollection} of {@code Double}s. - *

 {@code
- * PCollection input = ...;
- * PCollection min = input.apply(Min.doublesGlobally());
- * } 
- * - *

Example 2: calculate the minimum of the {@code Integer}s - * associated with each unique key (which is of type {@code String}). - *

 {@code
- * PCollection> input = ...;
- * PCollection> minPerKey = input
- *     .apply(Min.integersPerKey());
- * } 
- */ -public class Min { - - private Min() { - // do not instantiate - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a - * {@code PCollection} whose contents is a single value that is the minimum of the input - * {@code PCollection}'s elements, or {@code Integer.MAX_VALUE} if there are no elements. - */ - public static Combine.Globally integersGlobally() { - return Combine.globally(new MinIntegerFn()).named("Min.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and - * returns a {@code PCollection>} that contains an output element mapping each - * distinct key in the input {@code PCollection} to the minimum of the values associated with that - * key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static Combine.PerKey integersPerKey() { - return Combine.perKey(new MinIntegerFn()).named("Min.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a {@code - * PCollection} whose contents is the minimum of the input {@code PCollection}'s elements, - * or {@code Long.MAX_VALUE} if there are no elements. - */ - public static Combine.Globally longsGlobally() { - return Combine.globally(new MinLongFn()).named("Min.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns a - * {@code PCollection>} that contains an output element mapping each distinct key in - * the input {@code PCollection} to the minimum of the values associated with that key in the - * input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static Combine.PerKey longsPerKey() { - return Combine.perKey(new MinLongFn()).named("Min.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a - * {@code PCollection} whose contents is the minimum of the input {@code PCollection}'s - * elements, or {@code Double.POSITIVE_INFINITY} if there are no elements. - */ - public static Combine.Globally doublesGlobally() { - return Combine.globally(new MinDoubleFn()).named("Min.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns - * a {@code PCollection>} that contains an output element mapping each distinct key - * in the input {@code PCollection} to the minimum of the values associated with that key in the - * input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static Combine.PerKey doublesPerKey() { - return Combine.perKey(new MinDoubleFn()).named("Min.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a {@code - * PCollection} whose contents is the minimum according to the natural ordering of {@code T} - * of the input {@code PCollection}'s elements, or {@code null} if there are no elements. - */ - public static > - Combine.Globally globally() { - return Combine.globally(MinFn.naturalOrder()).named("Min.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns a - * {@code PCollection>} that contains an output element mapping each distinct key in the - * input {@code PCollection} to the minimum according to the natural ordering of {@code T} of the - * values associated with that key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static > - Combine.PerKey perKey() { - return Combine.perKey(MinFn.naturalOrder()).named("Min.PerKey"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection} and returns a {@code - * PCollection} whose contents is the minimum of the input {@code PCollection}'s elements, or - * {@code null} if there are no elements. - */ - public static & Serializable> - Combine.Globally globally(ComparatorT comparator) { - return Combine.globally(MinFn.of(comparator)).named("Min.Globally"); - } - - /** - * Returns a {@code PTransform} that takes an input {@code PCollection>} and returns a - * {@code PCollection>} that contains one output element per key mapping each - * to the minimum of the values associated with that key in the input {@code PCollection}. - * - *

See {@link Combine.PerKey} for how this affects timestamps and windowing. - */ - public static & Serializable> - Combine.PerKey perKey(ComparatorT comparator) { - return Combine.perKey(MinFn.of(comparator)).named("Min.PerKey"); - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * A {@code CombineFn} that computes the maximum of a collection of elements of type {@code T} - * using an arbitrary {@link Comparator}, useful as an argument to {@link Combine#globally} or - * {@link Combine#perKey}. - * - * @param the type of the values being compared - */ - public static class MinFn extends BinaryCombineFn { - - private final T identity; - private final Comparator comparator; - - private & Serializable> MinFn( - T identity, ComparatorT comparator) { - this.identity = identity; - this.comparator = comparator; - } - - public static & Serializable> - MinFn of(T identity, ComparatorT comparator) { - return new MinFn(identity, comparator); - } - - public static & Serializable> - MinFn of(ComparatorT comparator) { - return new MinFn(null, comparator); - } - - public static > MinFn naturalOrder(T identity) { - return new MinFn(identity, new Top.Largest()); - } - - public static > MinFn naturalOrder() { - return new MinFn(null, new Top.Largest()); - } - - @Override - public T identity() { - return identity; - } - - @Override - public T apply(T left, T right) { - return comparator.compare(left, right) <= 0 ? left : right; - } - } - - /** - * A {@code CombineFn} that computes the minimum of a collection of {@code Integer}s, useful as an - * argument to {@link Combine#globally} or {@link Combine#perKey}. - */ - public static class MinIntegerFn extends MinFn implements - CounterProvider { - public MinIntegerFn() { - super(Integer.MAX_VALUE, new Top.Largest()); - } - - @Override - public Counter getCounter(String name) { - return Counter.ints(name, AggregationKind.MIN); - } - } - - /** - * A {@code CombineFn} that computes the minimum of a collection of {@code Long}s, useful as an - * argument to {@link Combine#globally} or {@link Combine#perKey}. - */ - public static class MinLongFn extends MinFn implements - CounterProvider { - public MinLongFn() { - super(Long.MAX_VALUE, new Top.Largest()); - } - - @Override - public Counter getCounter(String name) { - return Counter.longs(name, AggregationKind.MIN); - } - } - - /** - * A {@code CombineFn} that computes the minimum of a collection of {@code Double}s, useful as an - * argument to {@link Combine#globally} or {@link Combine#perKey}. - */ - public static class MinDoubleFn extends MinFn implements - CounterProvider { - public MinDoubleFn() { - super(Double.POSITIVE_INFINITY, new Top.Largest()); - } - - @Override - public Counter getCounter(String name) { - return Counter.doubles(name, AggregationKind.MIN); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java deleted file mode 100644 index d4496b8..0000000 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/PTransform.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Builder; -import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData; -import com.google.cloud.dataflow.sdk.util.StringUtils; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.cloud.dataflow.sdk.values.TypedPValue; - -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -/** - * A {@code PTransform} is an operation that takes an - * {@code InputT} (some subtype of {@link PInput}) and produces an - * {@code OutputT} (some subtype of {@link POutput}). - * - *

Common PTransforms include root PTransforms like - * {@link com.google.cloud.dataflow.sdk.io.TextIO.Read}, - * {@link Create}, processing and - * conversion operations like {@link ParDo}, - * {@link GroupByKey}, - * {@link com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey}, - * {@link Combine}, and {@link Count}, and outputting - * PTransforms like - * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write}. Users also - * define their own application-specific composite PTransforms. - * - *

Each {@code PTransform} has a single - * {@code InputT} type and a single {@code OutputT} type. Many - * PTransforms conceptually transform one input value to one output - * value, and in this case {@code InputT} and {@code Output} are - * typically instances of - * {@link com.google.cloud.dataflow.sdk.values.PCollection}. - * A root - * PTransform conceptually has no input; in this case, conventionally - * a {@link com.google.cloud.dataflow.sdk.values.PBegin} object - * produced by calling {@link Pipeline#begin} is used as the input. - * An outputting PTransform conceptually has no output; in this case, - * conventionally {@link com.google.cloud.dataflow.sdk.values.PDone} - * is used as its output type. Some PTransforms conceptually have - * multiple inputs and/or outputs; in these cases special "bundling" - * classes like - * {@link com.google.cloud.dataflow.sdk.values.PCollectionList}, - * {@link com.google.cloud.dataflow.sdk.values.PCollectionTuple} - * are used - * to combine multiple values into a single bundle for passing into or - * returning from the PTransform. - * - *

A {@code PTransform} is invoked by calling - * {@code apply()} on its {@code InputT}, returning its {@code OutputT}. - * Calls can be chained to concisely create linear pipeline segments. - * For example: - * - *

 {@code
- * PCollection pc1 = ...;
- * PCollection pc2 =
- *     pc1.apply(ParDo.of(new MyDoFn>()))
- *        .apply(GroupByKey.create())
- *        .apply(Combine.perKey(new MyKeyedCombineFn()))
- *        .apply(ParDo.of(new MyDoFn2,T2>()));
- * } 
- * - *

PTransform operations have unique names, which are used by the - * system when explaining what's going on during optimization and - * execution. Each PTransform gets a system-provided default name, - * but it's a good practice to specify an explicit name, where - * possible, using the {@code named()} method offered by some - * PTransforms such as {@link ParDo}. For example: - * - *

 {@code
- * ...
- * .apply(ParDo.named("Step1").of(new MyDoFn3()))
- * ...
- * } 
- * - *

Each PCollection output produced by a PTransform, - * either directly or within a "bundling" class, automatically gets - * its own name derived from the name of its producing PTransform. - * - *

Each PCollection output produced by a PTransform - * also records a {@link com.google.cloud.dataflow.sdk.coders.Coder} - * that specifies how the elements of that PCollection - * are to be encoded as a byte string, if necessary. The - * PTransform may provide a default Coder for any of its outputs, for - * instance by deriving it from the PTransform input's Coder. If the - * PTransform does not specify the Coder for an output PCollection, - * the system will attempt to infer a Coder for it, based on - * what's known at run-time about the Java type of the output's - * elements. The enclosing {@link Pipeline}'s - * {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry} - * (accessible via {@link Pipeline#getCoderRegistry}) defines the - * mapping from Java types to the default Coder to use, for a standard - * set of Java types; users can extend this mapping for additional - * types, via - * {@link com.google.cloud.dataflow.sdk.coders.CoderRegistry#registerCoder}. - * If this inference process fails, either because the Java type was - * not known at run-time (e.g., due to Java's "erasure" of generic - * types) or there was no default Coder registered, then the Coder - * should be specified manually by calling - * {@link com.google.cloud.dataflow.sdk.values.TypedPValue#setCoder} - * on the output PCollection. The Coder of every output - * PCollection must be determined one way or another - * before that output is used as an input to another PTransform, or - * before the enclosing Pipeline is run. - * - *

A small number of PTransforms are implemented natively by the - * Google Cloud Dataflow SDK; such PTransforms simply return an - * output value as their apply implementation. - * The majority of PTransforms are - * implemented as composites of other PTransforms. Such a PTransform - * subclass typically just implements {@link #apply}, computing its - * Output value from its {@code InputT} value. User programs are encouraged to - * use this mechanism to modularize their own code. Such composite - * abstractions get their own name, and navigating through the - * composition hierarchy of PTransforms is supported by the monitoring - * interface. Examples of composite PTransforms can be found in this - * directory and in examples. From the caller's point of view, there - * is no distinction between a PTransform implemented natively and one - * implemented in terms of other PTransforms; both kinds of PTransform - * are invoked in the same way, using {@code apply()}. - * - *

Note on Serialization

- * - *

{@code PTransform} doesn't actually support serialization, despite - * implementing {@code Serializable}. - * - *

{@code PTransform} is marked {@code Serializable} solely - * because it is common for an anonymous {@code DoFn}, - * instance to be created within an - * {@code apply()} method of a composite {@code PTransform}. - * - *

Each of those {@code *Fn}s is {@code Serializable}, but - * unfortunately its instance state will contain a reference to the - * enclosing {@code PTransform} instance, and so attempt to serialize - * the {@code PTransform} instance, even though the {@code *Fn} - * instance never references anything about the enclosing - * {@code PTransform}. - * - *

To allow such anonymous {@code *Fn}s to be written - * conveniently, {@code PTransform} is marked as {@code Serializable}, - * and includes dummy {@code writeObject()} and {@code readObject()} - * operations that do not save or restore any state. - * - * @see Applying Transformations - * - * @param the type of the input to this PTransform - * @param the type of the output of this PTransform - */ -public abstract class PTransform - implements Serializable /* See the note above */, HasDisplayData { - /** - * Applies this {@code PTransform} on the given {@code InputT}, and returns its - * {@code Output}. - * - *

Composite transforms, which are defined in terms of other transforms, - * should return the output of one of the composed transforms. Non-composite - * transforms, which do not apply any transforms internally, should return - * a new unbound output and register evaluators (via backend-specific - * registration methods). - * - *

The default implementation throws an exception. A derived class must - * either implement apply, or else each runner must supply a custom - * implementation via - * {@link com.google.cloud.dataflow.sdk.runners.PipelineRunner#apply}. - */ - public OutputT apply(InputT input) { - throw new IllegalArgumentException( - "Runner " + input.getPipeline().getRunner() - + " has not registered an implementation for the required primitive operation " - + this); - } - - /** - * Called before invoking apply (which may be intercepted by the runner) to - * verify this transform is fully specified and applicable to the specified - * input. - * - *

By default, does nothing. - */ - public void validate(InputT input) { } - - /** - * Returns the transform name. - * - *

This name is provided by the transform creator and is not required to be unique. - */ - public String getName() { - return name != null ? name : getKindString(); - } - - ///////////////////////////////////////////////////////////////////////////// - - // See the note about about PTransform's fake Serializability, to - // understand why all of its instance state is transient. - - /** - * The base name of this {@code PTransform}, e.g., from - * {@link ParDo#named(String)}, or from defaults, or {@code null} if not - * yet assigned. - */ - protected final transient String name; - - protected PTransform() { - this.name = null; - } - - protected PTransform(String name) { - this.name = name; - } - - @Override - public String toString() { - if (name == null) { - return getKindString(); - } else { - return getName() + " [" + getKindString() + "]"; - } - } - - /** - * Returns the name to use by default for this {@code PTransform} - * (not including the names of any enclosing {@code PTransform}s). - * - *

By default, returns the base name of this {@code PTransform}'s class. - * - *

The caller is responsible for ensuring that names of applied - * {@code PTransform}s are unique, e.g., by adding a uniquifying - * suffix when needed. - */ - protected String getKindString() { - if (getClass().isAnonymousClass()) { - return "AnonymousTransform"; - } else { - return StringUtils.approximatePTransformName(getClass()); - } - } - - private void writeObject(ObjectOutputStream oos) { - // We don't really want to be serializing this object, but we - // often have serializable anonymous DoFns nested within a - // PTransform. - } - - private void readObject(ObjectInputStream oos) { - // We don't really want to be serializing this object, but we - // often have serializable anonymous DoFns nested within a - // PTransform. - } - - /** - * Returns the default {@code Coder} to use for the output of this - * single-output {@code PTransform}. - * - *

By default, always throws - * - * @throws CannotProvideCoderException if no coder can be inferred - */ - protected Coder getDefaultOutputCoder() throws CannotProvideCoderException { - throw new CannotProvideCoderException( - "PTransform.getDefaultOutputCoder called."); - } - - /** - * Returns the default {@code Coder} to use for the output of this - * single-output {@code PTransform} when applied to the given input. - * - * @throws CannotProvideCoderException if none can be inferred. - * - *

By default, always throws. - */ - protected Coder getDefaultOutputCoder(@SuppressWarnings("unused") InputT input) - throws CannotProvideCoderException { - return getDefaultOutputCoder(); - } - - /** - * Returns the default {@code Coder} to use for the given output of - * this single-output {@code PTransform} when applied to the given input. - * - * @throws CannotProvideCoderException if none can be inferred. - * - *

By default, always throws. - */ - public Coder getDefaultOutputCoder( - InputT input, @SuppressWarnings("unused") TypedPValue output) - throws CannotProvideCoderException { - @SuppressWarnings("unchecked") - Coder defaultOutputCoder = (Coder) getDefaultOutputCoder(input); - return defaultOutputCoder; - } - - /** - * {@inheritDoc} - * - *

By default, does not register any display data. Implementors may override this method - * to provide their own display metadata. - */ - @Override - public void populateDisplayData(Builder builder) { - } -}