beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject incubator-beam git commit: Add CountingInput as a PTransform
Date Thu, 03 Mar 2016 22:39:34 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 365029863 -> 5a7bd8083


Add CountingInput as a PTransform

This transform produces an unbounded PCollection containing longs based
on a CountingSource.

Deprecate methods producing a Source in CountingSource.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a7bd808
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a7bd808
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a7bd808

Branch: refs/heads/master
Commit: 5a7bd80832d72ed8a287d4aab1f1f9cfa6d18c8a
Parents: 3650298
Author: Thomas Groh <tgroh@google.com>
Authored: Wed Mar 2 14:03:28 2016 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Mar 3 14:33:01 2016 -0800

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/CountingInput.java    | 191 +++++++++++++++++++
 .../cloud/dataflow/sdk/io/CountingSource.java   |  31 ++-
 .../dataflow/sdk/io/CountingInputTest.java      | 122 ++++++++++++
 3 files changed, 334 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a7bd808/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
new file mode 100644
index 0000000..07609ba
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn;
+import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
+import com.google.common.base.Optional;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} that produces longs. When used to produce a
+ * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at
{@code 0}
+ * and counts up to a specified maximum. When used to produce an
+ * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
+ * and then never produces more output. (In practice, this limit should never be reached.)
+ *
+ * <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource}
and
+ * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting
and it
+ * supports dynamic work rebalancing.
+ *
+ * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
+ *
+ * <pre>{@code
+ * Pipeline p = ...
+ * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
+ * PCollection<Long> bounded = p.apply(producer);
+ * }</pre>
+ *
+ * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
+ * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide
values
+ * with timestamps other than {@link Instant#now}.
+ *
+ * <pre>{@code
+ * Pipeline p = ...
+ *
+ * // To create an unbounded producer that uses processing time as the element timestamp.
+ * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
+ * // Or, to create an unbounded source that uses a provided function to set the element
timestamp.
+ * PCollection<Long> unboundedWithTimestamps =
+ *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
+ * }</pre>
+ */
+public class CountingInput {
+  /**
+   * Creates a {@link BoundedCountingInput} that will produce the specified number of elements,
+   * from {@code 0} to {@code numElements - 1}.
+   */
+  public static BoundedCountingInput upTo(long numElements) {
+    checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
+    return new BoundedCountingInput(numElements);
+  }
+
+  /**
+   * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code
0} up
+   * to {@link Long#MAX_VALUE}.
+   *
+   * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In
practice, this
+   * limit should never be reached.)
+   *
+   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;}
will by default have
+   * timestamps corresponding to processing time at element generation, provided by
+   * {@link Instant#now}. Use the transform returned by
+   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the
output
+   * timestamps.
+   */
+  public static UnboundedCountingInput unbounded() {
+    return new UnboundedCountingInput(
+        new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
+  }
+
+  /**
+   * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting
from
+   * 0.
+   */
+  public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>>
{
+    private final long numElements;
+
+    private BoundedCountingInput(long numElements) {
+      this.numElements = numElements;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public PCollection<Long> apply(PBegin begin) {
+      return begin.apply(Read.from(CountingSource.upTo(numElements)));
+    }
+  }
+
+  /**
+   * A {@link PTransform} that will produce numbers starting from {@code 0} up to
+   * {@link Long#MAX_VALUE}.
+   *
+   * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In
practice, this
+   * limit should never be reached.)
+   *
+   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;}
will by default have
+   * timestamps corresponding to processing time at element generation, provided by
+   * {@link Instant#now}. Use the transform returned by
+   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the
output
+   * timestamps.
+   */
+  public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>>
{
+    private final SerializableFunction<Long, Instant> timestampFn;
+    private final Optional<Long> maxNumRecords;
+    private final Optional<Duration> maxReadTime;
+
+    private UnboundedCountingInput(
+        SerializableFunction<Long, Instant> timestampFn,
+        Optional<Long> maxNumRecords,
+        Optional<Duration> maxReadTime) {
+      this.timestampFn = timestampFn;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but where output elements
have the
+     * timestamp specified by the timestampFn.
+     *
+     * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
+     */
+    public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant>
timestampFn) {
+      return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but that will read at most
the
+     * specified number of elements.
+     *
+     * <p>A bounded amount of elements will be produced by the result transform, and
the result
+     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
+     */
+    public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
+      checkArgument(
+          maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
+      return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but that will read for at
most the
+     * specified amount of time.
+     *
+     * <p>A bounded amount of elements will be produced by the result transform, and
the result
+     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
+     */
+    public UnboundedCountingInput withMaxReadTime(Duration readTime) {
+      checkNotNull(readTime, "ReadTime cannot be null");
+      return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public PCollection<Long> apply(PBegin begin) {
+      Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
+      if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
+        return begin.apply(read);
+      } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
+        return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
+      } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
+        return begin.apply(read.withMaxReadTime(maxReadTime.get()));
+      } else {
+        return begin.apply(
+            read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a7bd808/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
index 2938534..412f3a7 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
@@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
 import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
@@ -48,29 +49,33 @@ import java.util.NoSuchElementException;
  *
  * <pre>{@code
  * Pipeline p = ...
- * BoundedSource<Long> source = CountingSource.upTo(1000);
- * PCollection<Long> bounded = p.apply(Read.from(source));
+ * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
+ * PCollection<Long> bounded = p.apply(producer);
  * }</pre>
  *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingSource#unbounded}
or
- * {@link CountingSource#unboundedWithTimestampFn}:
+ * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
+ * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide
values
+ * with timestamps other than {@link Instant#now}.
  *
  * <pre>{@code
  * Pipeline p = ...
  *
- * // To create an unbounded source that uses processing time as the element timestamp.
- * UnboundedSource<Long, CounterMark> source = CountingSource.unbounded();
+ * // To create an unbounded PCollection that uses processing time as the element timestamp.
+ * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
  * // Or, to create an unbounded source that uses a provided function to set the element
timestamp.
- * UnboundedSource<Long, CounterMark> source = CountingSource.unboundedWithTimestampFn(someFn);
+ * PCollection<Long> unboundedWithTimestamps =
+ *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
  *
- * PCollection<Long> unbounded = p.apply(Read.from(source));
  * }</pre>
  */
 public class CountingSource {
   /**
    * Creates a {@link BoundedSource} that will produce the specified number of elements,
    * from {@code 0} to {@code numElements - 1}.
+   *
+   * @deprecated use {@link CountingInput#upTo(long)} instead
    */
+  @Deprecated
   public static BoundedSource<Long> upTo(long numElements) {
     checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
     return new BoundedCountingSource(0, numElements);
@@ -85,7 +90,10 @@ public class CountingSource {
    *
    * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;}
will have timestamps
    * corresponding to processing time at element generation, provided by {@link Instant#now}.
+   *
+   * @deprecated use {@link CountingInput#unbounded()} instead
    */
+  @Deprecated
   public static UnboundedSource<Long, CounterMark> unbounded() {
     return unboundedWithTimestampFn(new NowTimestampFn());
   }
@@ -98,7 +106,11 @@ public class CountingSource {
    * limit should never be reached.)
    *
    * <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
+   *
+   * @deprecated use {@link CountingInput#unbounded()} and call
+   *             {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
    */
+  @Deprecated
   public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
       SerializableFunction<Long, Instant> timestampFn) {
     return new UnboundedCountingSource(0, 1, timestampFn);
@@ -109,11 +121,10 @@ public class CountingSource {
   /** Prevent instantiation. */
   private CountingSource() {}
 
-
   /**
    * A function that returns {@link Instant#now} as the timestamp for each generated element.
    */
-  private static class NowTimestampFn implements SerializableFunction<Long, Instant>
{
+  static class NowTimestampFn implements SerializableFunction<Long, Instant> {
     @Override
     public Instant apply(Long input) {
       return Instant.now();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a7bd808/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
new file mode 100644
index 0000000..948a892
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
+import com.google.cloud.dataflow.sdk.transforms.Min;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for {@link CountingInput}.
+ */
+public class CountingInputTest {
+  public static void addCountingAsserts(PCollection<Long> input, long numElements)
{
+    // Count == numElements
+    DataflowAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
+        .isEqualTo(numElements);
+    // Unique count == numElements
+    DataflowAssert.thatSingleton(
+            input
+                .apply(RemoveDuplicates.<Long>create())
+                .apply("UniqueCount", Count.<Long>globally()))
+        .isEqualTo(numElements);
+    // Min == 0
+    DataflowAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L);
+    // Max == numElements-1
+    DataflowAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
+        .isEqualTo(numElements - 1);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundedInput() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 1000;
+    PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedInput() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 1000;
+
+    PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() - c.timestamp().getMillis());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedInputTimestamps() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 1000;
+
+    PCollection<Long> input =
+        p.apply(
+            CountingInput.unbounded()
+                .withTimestampFn(new ValueAsTimestampFn())
+                .withMaxNumRecords(numElements));
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+        input
+            .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+            .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  /**
+   * A timestamp function that uses the given value as the timestamp. Because the input values
will
+   * not wrap, this function is non-decreasing and meets the timestamp function criteria
laid out
+   * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
+   */
+  private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant>
{
+    @Override
+    public Instant apply(Long input) {
+      return new Instant(input);
+    }
+  }
+}


Mime
View raw message