beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [7/8] beam git commit: [BEAM-1722] Move PubsubIO into the google-cloud-platform module
Date Wed, 12 Apr 2017 15:17:04 GMT
[BEAM-1722] Move PubsubIO into the google-cloud-platform module


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bcb8c57
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bcb8c57
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bcb8c57

Branch: refs/heads/master
Commit: 5bcb8c579656bc2d6e4d2a8dd5dcb2a46875812f
Parents: 82f2f2c
Author: Ismaël Mejía <iemejia@apache.org>
Authored: Wed Apr 12 14:49:58 2017 +0200
Committer: Ismaël Mejía <iemejia@apache.org>
Committed: Wed Apr 12 14:49:58 2017 +0200

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |    2 +-
 .../examples/complete/game/LeaderBoard.java     |    2 +-
 .../triggers/AfterWatermarkStateMachine.java    |   14 +-
 runners/google-cloud-dataflow-java/pom.xml      |    5 +-
 .../beam/runners/dataflow/DataflowRunner.java   |    4 +-
 sdks/java/core/pom.xml                          |   44 -
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 1016 ------------
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  494 ------
 .../beam/sdk/io/PubsubUnboundedSource.java      | 1463 ------------------
 .../apache/beam/sdk/transforms/GroupByKey.java  |    4 +-
 .../transforms/windowing/AfterWatermark.java    |   14 +-
 .../org/apache/beam/sdk/util/PubsubClient.java  |  544 -------
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |  424 -----
 .../apache/beam/sdk/util/PubsubJsonClient.java  |  317 ----
 .../apache/beam/sdk/util/PubsubTestClient.java  |  436 ------
 .../org/apache/beam/sdk/util/Transport.java     |    3 +-
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  189 ---
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  190 ---
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  411 -----
 .../apache/beam/sdk/util/PubsubClientTest.java  |  189 ---
 .../beam/sdk/util/PubsubGrpcClientTest.java     |  207 ---
 .../beam/sdk/util/PubsubJsonClientTest.java     |  140 --
 .../beam/sdk/util/PubsubTestClientTest.java     |  114 --
 sdks/java/io/google-cloud-platform/pom.xml      |   44 +
 .../beam/sdk/io/gcp/pubsub/PubsubClient.java    |  544 +++++++
 .../sdk/io/gcp/pubsub/PubsubGrpcClient.java     |  424 +++++
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 1014 ++++++++++++
 .../sdk/io/gcp/pubsub/PubsubJsonClient.java     |  319 ++++
 .../sdk/io/gcp/pubsub/PubsubTestClient.java     |  436 ++++++
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  490 ++++++
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 1463 ++++++++++++++++++
 .../beam/sdk/io/gcp/pubsub/package-info.java    |   24 +
 .../beam/sdk/io/gcp/GcpApiSurfaceTest.java      |    5 +-
 .../sdk/io/gcp/pubsub/PubsubClientTest.java     |  189 +++
 .../sdk/io/gcp/pubsub/PubsubGrpcClientTest.java |  208 +++
 .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java    |  189 +++
 .../sdk/io/gcp/pubsub/PubsubJsonClientTest.java |  139 ++
 .../sdk/io/gcp/pubsub/PubsubTestClientTest.java |  114 ++
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  |  188 +++
 .../gcp/pubsub/PubsubUnboundedSourceTest.java   |  409 +++++
 40 files changed, 6218 insertions(+), 6207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 93e8254..6874953 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -25,7 +25,7 @@ import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 35b586b..96f4291 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -28,7 +28,7 @@ import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 0b12005..1b117d2 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -31,18 +31,16 @@ import org.apache.beam.sdk.util.TimeDomain;
  * lower-bound, sometimes heuristically established, on event times that have been fully processed
  * by the pipeline.
  *
- * <p>For sources that provide non-heuristic watermarks (e.g.
- * {@link org.apache.beam.sdk.io.PubsubIO} when using arrival times as event times), the
- * watermark is a strict guarantee that no data with an event time earlier than
+ * <p>For sources that provide non-heuristic watermarks (e.g. PubsubIO when using arrival times as
+ * event times), the watermark is a strict guarantee that no data with an event time earlier than
  * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
  * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
  * of the window will be the last pane ever for that window.
  *
- * <p>For sources that provide heuristic watermarks (e.g.
- * {@link org.apache.beam.sdk.io.PubsubIO} when using user-supplied event times), the
- * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that
- * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can
- * often be quite accurate, but the chance of seeing late data for any given window is non-zero.
+ * <p>For sources that provide heuristic watermarks (e.g. PubsubIO when using user-supplied event
+ * times), the watermark itself becomes an <i>estimate</i> that no data with an event time earlier
+ * than that watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics
+ * can often be quite accurate, but the chance of seeing late data for any given window is non-zero.
  * Thus, if absolute correctness over time is important to your use case, you may want to consider
  * using a trigger that accounts for late data. The default trigger,
  * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index a57744c..96eced8 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -188,13 +188,12 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-runners-core-construction-java</artifactId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
     </dependency>
 
     <dependency>
       <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
-      <scope>test</scope>
+      <artifactId>beam-runners-core-construction-java</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8726635..684dc14 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -82,11 +82,11 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.PubsubUnboundedSink;
-import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index d117d5a..0ac40f4 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -140,40 +140,6 @@
     </dependency>
 
     <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-auth</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-netty</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-stub</artifactId>
-    </dependency>
-
-    <!-- grpc-all does not obey IWYU, so we need to exclude from compile
-         scope and depend on it at runtime. -->
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-all</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-protobuf</artifactId>
-      <scope>runtime</scope>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.auth</groupId>
       <artifactId>google-auth-library-credentials</artifactId>
     </dependency>
@@ -184,16 +150,6 @@
     </dependency>
 
     <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-handler</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.api.grpc</groupId>
-      <artifactId>grpc-google-pubsub-v1</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.api-client</groupId>
       <artifactId>google-api-client</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
deleted file mode 100644
index 67ab2ec..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ /dev/null
@@ -1,1016 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.ProjectPath;
-import org.apache.beam.sdk.util.PubsubClient.SubscriptionPath;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.util.PubsubJsonClient;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create
- * and consume unbounded {@link PCollection PCollections}.
- *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Beam pipeline. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-public class PubsubIO {
-
-  private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
-
-  /** Factory for creating pubsub client to manage transport. */
-  private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
-
-  /**
-   * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
-   * IDs must start with a letter and may not end with a dash.
-   * This regex isn't exact - this allows for patterns that would be rejected by
-   * the service, but this is sufficient for basic parsing of table references.
-   */
-  private static final Pattern PROJECT_ID_REGEXP =
-      Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
-
-  private static final Pattern SUBSCRIPTION_REGEXP =
-      Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
-
-  private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
-
-  private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP =
-      Pattern.compile("/subscriptions/([^/]+)/(.+)");
-
-  private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
-
-  private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
-
-  private static final int PUBSUB_NAME_MIN_LENGTH = 3;
-  private static final int PUBSUB_NAME_MAX_LENGTH = 255;
-
-  private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
-  private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
-  private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
-
-  private static void validateProjectName(String project) {
-    Matcher match = PROJECT_ID_REGEXP.matcher(project);
-    if (!match.matches()) {
-      throw new IllegalArgumentException(
-          "Illegal project name specified in Pubsub subscription: " + project);
-    }
-  }
-
-  private static void validatePubsubName(String name) {
-    if (name.length() < PUBSUB_NAME_MIN_LENGTH) {
-      throw new IllegalArgumentException(
-          "Pubsub object name is shorter than 3 characters: " + name);
-    }
-    if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
-      throw new IllegalArgumentException(
-          "Pubsub object name is longer than 255 characters: " + name);
-    }
-
-    if (name.startsWith("goog")) {
-      throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
-    }
-
-    Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
-    if (!match.matches()) {
-      throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name
-          + " Please see Javadoc for naming rules.");
-    }
-  }
-
-  /**
-   * Populate common {@link DisplayData} between Pubsub source and sink.
-   */
-  private static void populateCommonDisplayData(DisplayData.Builder builder,
-      String timestampLabel, String idLabel, ValueProvider<PubsubTopic> topic) {
-    builder
-        .addIfNotNull(DisplayData.item("timestampLabel", timestampLabel)
-            .withLabel("Timestamp Label Attribute"))
-        .addIfNotNull(DisplayData.item("idLabel", idLabel)
-            .withLabel("ID Label Attribute"));
-
-    if (topic != null) {
-      String topicString = topic.isAccessible() ? topic.get().asPath()
-          : topic.toString();
-      builder.add(DisplayData.item("topic", topicString)
-          .withLabel("Pubsub Topic"));
-    }
-  }
-
-  /**
-   * Class representing a Pub/Sub message. Each message contains a single message payload and
-   * a map of attached attributes.
-   */
-  public static class PubsubMessage {
-
-    private byte[] message;
-    private Map<String, String> attributes;
-
-    public PubsubMessage(byte[] message, Map<String, String> attributes) {
-      this.message = message;
-      this.attributes = attributes;
-    }
-
-    /**
-     * Returns the main PubSub message.
-     */
-    public byte[] getMessage() {
-      return message;
-    }
-
-    /**
-     * Returns the given attribute value. If not such attribute exists, returns null.
-     */
-    @Nullable
-    public String getAttribute(String attribute) {
-      checkNotNull(attribute, "attribute");
-      return attributes.get(attribute);
-    }
-
-    /**
-     * Returns the full map of attributes. This is an unmodifiable map.
-     */
-    public Map<String, String> getAttributeMap() {
-      return attributes;
-    }
-  }
-
-  /**
-   * Class representing a Cloud Pub/Sub Subscription.
-   */
-  public static class PubsubSubscription implements Serializable {
-
-    private enum Type {NORMAL, FAKE}
-
-    private final Type type;
-    private final String project;
-    private final String subscription;
-
-    private PubsubSubscription(Type type, String project, String subscription) {
-      this.type = type;
-      this.project = project;
-      this.subscription = subscription;
-    }
-
-    /**
-     * Creates a class representing a Pub/Sub subscription from the specified subscription path.
-     *
-     * <p>Cloud Pub/Sub subscription names should be of the form
-     * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name
-     * of the project the subscription belongs to. The {@code <subscription>} component must comply
-     * with the following requirements:
-     *
-     * <ul>
-     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
-     * ('.').</li>
-     * <li>Must be between 3 and 255 characters.</li>
-     * <li>Must begin with a letter.</li>
-     * <li>Must end with a letter or a number.</li>
-     * <li>Cannot begin with {@code 'goog'} prefix.</li>
-     * </ul>
-     */
-    public static PubsubSubscription fromPath(String path) {
-      if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)
-          || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) {
-        return new PubsubSubscription(Type.FAKE, "", path);
-      }
-
-      String projectName, subscriptionName;
-
-      Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
-      if (v1beta1Match.matches()) {
-        LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format "
-            + "projects/<project_id>/subscriptions/<subscription_name>");
-        projectName = v1beta1Match.group(1);
-        subscriptionName = v1beta1Match.group(2);
-      } else {
-        Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
-        if (!match.matches()) {
-          throw new IllegalArgumentException("Pubsub subscription is not in "
-              + "projects/<project_id>/subscriptions/<subscription_name> format: " + path);
-        }
-        projectName = match.group(1);
-        subscriptionName = match.group(2);
-      }
-
-      validateProjectName(projectName);
-      validatePubsubName(subscriptionName);
-      return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * v1beta1 API.
-     *
-     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta1Path() {
-      if (type == Type.NORMAL) {
-        return "/subscriptions/" + project + "/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * v1beta2 API.
-     *
-     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta2Path() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/subscriptions/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * API.
-     */
-    public String asPath() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/subscriptions/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-  }
-
-  /**
-   * Used to build a {@link ValueProvider} for {@link PubsubSubscription}.
-   */
-  private static class SubscriptionTranslator
-      implements SerializableFunction<String, PubsubSubscription> {
-
-    @Override
-    public PubsubSubscription apply(String from) {
-      return PubsubSubscription.fromPath(from);
-    }
-  }
-
-  /**
-   * Used to build a {@link ValueProvider} for {@link SubscriptionPath}.
-   */
-  private static class SubscriptionPathTranslator
-      implements SerializableFunction<PubsubSubscription, SubscriptionPath> {
-
-    @Override
-    public SubscriptionPath apply(PubsubSubscription from) {
-      return PubsubClient.subscriptionPathFromName(from.project, from.subscription);
-    }
-  }
-
-  /**
-   * Used to build a {@link ValueProvider} for {@link PubsubTopic}.
-   */
-  private static class TopicTranslator
-      implements SerializableFunction<String, PubsubTopic> {
-
-    @Override
-    public PubsubTopic apply(String from) {
-      return PubsubTopic.fromPath(from);
-    }
-  }
-
-  /**
-   * Used to build a {@link ValueProvider} for {@link TopicPath}.
-   */
-  private static class TopicPathTranslator
-      implements SerializableFunction<PubsubTopic, TopicPath> {
-
-    @Override
-    public TopicPath apply(PubsubTopic from) {
-      return PubsubClient.topicPathFromName(from.project, from.topic);
-    }
-  }
-
-  /**
-   * Used to build a {@link ValueProvider} for {@link ProjectPath}.
-   */
-  private static class ProjectPathTranslator
-      implements SerializableFunction<PubsubTopic, ProjectPath> {
-
-    @Override
-    public ProjectPath apply(PubsubTopic from) {
-      return PubsubClient.projectPathFromId(from.project);
-    }
-  }
-
-  /**
-   * Class representing a Cloud Pub/Sub Topic.
-   */
-  public static class PubsubTopic implements Serializable {
-
-    private enum Type {NORMAL, FAKE}
-
-    private final Type type;
-    private final String project;
-    private final String topic;
-
-    private PubsubTopic(Type type, String project, String topic) {
-      this.type = type;
-      this.project = project;
-      this.topic = topic;
-    }
-
-    /**
-     * Creates a class representing a Cloud Pub/Sub topic from the specified topic path.
-     *
-     * <p>Cloud Pub/Sub topic names should be of the form
-     * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
-     * the publishing project. The {@code <topic>} component must comply with
-     * the following requirements:
-     *
-     * <ul>
-     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
-     * ('.').</li>
-     * <li>Must be between 3 and 255 characters.</li>
-     * <li>Must begin with a letter.</li>
-     * <li>Must end with a letter or a number.</li>
-     * <li>Cannot begin with 'goog' prefix.</li>
-     * </ul>
-     */
-    public static PubsubTopic fromPath(String path) {
-      if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) {
-        return new PubsubTopic(Type.FAKE, "", path);
-      }
-
-      String projectName, topicName;
-
-      Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
-      if (v1beta1Match.matches()) {
-        LOG.warn("Saw topic in v1beta1 format.  Topics should be in the format "
-            + "projects/<project_id>/topics/<topic_name>");
-        projectName = v1beta1Match.group(1);
-        topicName = v1beta1Match.group(2);
-      } else {
-        Matcher match = TOPIC_REGEXP.matcher(path);
-        if (!match.matches()) {
-          throw new IllegalArgumentException(
-              "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
-        }
-        projectName = match.group(1);
-        topicName = match.group(2);
-      }
-
-      validateProjectName(projectName);
-      validatePubsubName(topicName);
-      return new PubsubTopic(Type.NORMAL, projectName, topicName);
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * v1beta1 API.
-     *
-     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta1Path() {
-      if (type == Type.NORMAL) {
-        return "/topics/" + project + "/" + topic;
-      } else {
-        return topic;
-      }
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * v1beta2 API.
-     *
-     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta2Path() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/topics/" + topic;
-      } else {
-        return topic;
-      }
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * API.
-     */
-    public String asPath() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/topics/" + topic;
-      } else {
-        return topic;
-      }
-    }
-  }
-
-  public static <T> Read<T> read() {
-    return new Read<>();
-  }
-
-  public static <T> Write<T> write() {
-    return new Write<>();
-  }
-
-  /**
-   * A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream and
-   * returns a {@link PCollection} of {@link String Strings} containing the items from
-   * the stream.
-   */
-  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-
-    /** The Cloud Pub/Sub topic to read from. */
-    @Nullable
-    private final ValueProvider<PubsubTopic> topic;
-
-    /** The Cloud Pub/Sub subscription to read from. */
-    @Nullable
-    private final ValueProvider<PubsubSubscription> subscription;
-
-    /** The name of the message attribute to read timestamps from. */
-    @Nullable
-    private final String timestampLabel;
-
-    /** The name of the message attribute to read unique message IDs from. */
-    @Nullable
-    private final String idLabel;
-
-    /** The coder used to decode each record. */
-    @Nullable
-    private final Coder<T> coder;
-
-    /** User function for parsing PubsubMessage object. */
-    SimpleFunction<PubsubMessage, T> parseFn;
-
-    private Read() {
-      this(null, null, null, null, null, null, null);
-    }
-
-    private Read(String name, ValueProvider<PubsubSubscription> subscription,
-        ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
-        String idLabel,
-        SimpleFunction<PubsubMessage, T> parseFn) {
-      super(name);
-      this.subscription = subscription;
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.coder = coder;
-      this.idLabel = idLabel;
-      this.parseFn = parseFn;
-    }
-
-    /**
-     * Returns a transform that's like this one but reading from the
-     * given subscription.
-     *
-     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-     * of the {@code subscription} string.
-     *
-     * <p>Multiple readers reading from the same subscription will each receive
-     * some arbitrary portion of the data.  Most likely, separate readers should
-     * use their own subscriptions.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read<T> subscription(String subscription) {
-      return subscription(StaticValueProvider.of(subscription));
-    }
-
-    /**
-     * Like {@code subscription()} but with a {@link ValueProvider}.
-     */
-    public Read<T> subscription(ValueProvider<String> subscription) {
-      if (subscription.isAccessible()) {
-        // Validate.
-        PubsubSubscription.fromPath(subscription.get());
-      }
-      return new Read<>(
-          name, NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-          null /* reset topic to null */, timestampLabel, coder, idLabel, parseFn);
-    }
-
-    /**
-     * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
-     * with {@link #subscription(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
-     * of the {@code topic} string.
-     *
-     * <p>The Beam runner will start reading data published on this topic from the time the
-     * pipeline is started. Any data published on the topic before the pipeline is started will
-     * not be read by the runner.
-     */
-    public Read<T> topic(String topic) {
-      return topic(StaticValueProvider.of(topic));
-    }
-
-    /**
-     * Like {@code topic()} but with a {@link ValueProvider}.
-     */
-    public Read<T> topic(ValueProvider<String> topic) {
-      if (topic.isAccessible()) {
-        // Validate.
-        PubsubTopic.fromPath(topic.get());
-      }
-      return new Read<>(name, null /* reset subscription to null */,
-          NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, coder, idLabel, parseFn);
-    }
-
-    /**
-     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-     * parameter specifies the name of the attribute that contains the timestamp.
-     *
-     * <p>The timestamp value is expected to be represented in the attribute as either:
-     *
-     * <ul>
-     * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
-     * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
-     * value for this attribute.
-     * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
-     * sub-second component of the timestamp is optional, and digits beyond the first three
-     * (i.e., time units smaller than milliseconds) will be ignored.
-     * </ul>
-     *
-     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
-     * the first time it sees each record. All windowing will be done relative to these
-     * timestamps.
-     *
-     * <p>By default, windows are emitted based on an estimate of when this source is likely
-     * done producing data for a given timestamp (referred to as the Watermark; see
-     * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
-     * specified with the windowing strategy &ndash; by default it will be output immediately.
-     *
-     * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
-     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
-     *
-     * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
-     */
-    public Read<T> timestampLabel(String timestampLabel) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-     * parameter specifies the attribute name. The value of the attribute can be any string
-     * that uniquely identifies this record.
-     *
-     * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
-     * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
-     * be delivered, and deduplication of the stream will be strictly best effort.
-     */
-    public Read<T> idLabel(String idLabel) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
-    }
-
-    /**
-     * Returns a transform that's like this one but that uses the given
-     * {@link Coder} to decode each record into a value of type {@code T}.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read<T> withCoder(Coder<T> coder) {
-      return new Read<>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
-    }
-
-    /**
-     * Causes the source to return a PubsubMessage that includes Pubsub attributes.
-     * The user must supply a parsing function to transform the PubsubMessage into an output type.
-     * A Coder for the output type T must be registered or set on the output via
-     * {@link PCollection#setCoder(Coder)}.
-     */
-    public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
-      return new Read<T>(
-          name, subscription, topic, timestampLabel, coder, idLabel,
-          parseFn);
-    }
-
-    @Override
-    public PCollection<T> expand(PBegin input) {
-      if (topic == null && subscription == null) {
-        throw new IllegalStateException("Need to set either the topic or the subscription for "
-            + "a PubsubIO.Read transform");
-      }
-      if (topic != null && subscription != null) {
-        throw new IllegalStateException("Can't set both the topic and the subscription for "
-            + "a PubsubIO.Read transform");
-      }
-      if (coder == null) {
-        throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
-            + "the withCoder method.");
-      }
-
-      @Nullable ValueProvider<ProjectPath> projectPath =
-          topic == null ? null : NestedValueProvider.of(topic, new ProjectPathTranslator());
-      @Nullable ValueProvider<TopicPath> topicPath =
-          topic == null ? null : NestedValueProvider.of(topic, new TopicPathTranslator());
-      @Nullable ValueProvider<SubscriptionPath> subscriptionPath =
-          subscription == null
-              ? null
-              : NestedValueProvider.of(subscription, new SubscriptionPathTranslator());
-      PubsubUnboundedSource<T> source = new PubsubUnboundedSource<T>(
-              FACTORY, projectPath, topicPath, subscriptionPath,
-              coder, timestampLabel, idLabel, parseFn);
-      return input.getPipeline().apply(source);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
-
-      if (subscription != null) {
-        String subscriptionString = subscription.isAccessible()
-            ? subscription.get().asPath() : subscription.toString();
-        builder.add(DisplayData.item("subscription", subscriptionString)
-            .withLabel("Pubsub Subscription"));
-      }
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    /**
-     * Get the topic being read from.
-     */
-    @Nullable
-    public PubsubTopic getTopic() {
-      return topic == null ? null : topic.get();
-    }
-
-    /**
-     * Get the {@link ValueProvider} for the topic being read from.
-     */
-    public ValueProvider<PubsubTopic> getTopicProvider() {
-      return topic;
-    }
-
-    /**
-     * Get the subscription being read from.
-     */
-    @Nullable
-    public PubsubSubscription getSubscription() {
-      return subscription == null ? null : subscription.get();
-    }
-
-    /**
-     * Get the {@link ValueProvider} for the subscription being read from.
-     */
-    public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
-      return subscription;
-    }
-
-    /**
-     * Get the timestamp label.
-     */
-    @Nullable
-    public String getTimestampLabel() {
-      return timestampLabel;
-    }
-
-    /**
-     * Get the id label.
-     */
-    @Nullable
-    public String getIdLabel() {
-      return idLabel;
-    }
-
-
-    /**
-     * Get the {@link Coder} used for the transform's output.
-     */
-    @Nullable
-    public Coder<T> getCoder() {
-      return coder;
-    }
-
-    /**
-     * Get the parse function used for PubSub attributes.
-     */
-    @Nullable
-    public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
-      return parseFn;
-    }
-
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Disallow construction of utility class. */
-  private PubsubIO() {}
-
-
-  /**
-   * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
-   * to a Cloud Pub/Sub stream.
-   */
-  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
-
-    /** The Cloud Pub/Sub topic to publish to. */
-    @Nullable
-    private final ValueProvider<PubsubTopic> topic;
-    /** The name of the message attribute to publish message timestamps in. */
-    @Nullable
-    private final String timestampLabel;
-    /** The name of the message attribute to publish unique message IDs in. */
-    @Nullable
-    private final String idLabel;
-    /** The input type Coder. */
-    private final Coder<T> coder;
-    /** The format function for input PubsubMessage objects. */
-    SimpleFunction<T, PubsubMessage> formatFn;
-
-    private Write() {
-      this(null, null, null, null, null, null);
-    }
-
-    private Write(
-        String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
-        String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
-      super(name);
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
-      this.coder = coder;
-      this.formatFn = formatFn;
-    }
-
-    /**
-     * Creates a transform that publishes to the specified topic.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-     * {@code topic} string.
-     */
-    public Write<T> topic(String topic) {
-      return topic(StaticValueProvider.of(topic));
-    }
-
-    /**
-     * Like {@code topic()} but with a {@link ValueProvider}.
-     */
-    public Write<T> topic(ValueProvider<String> topic) {
-      return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
-          timestampLabel, idLabel, coder, formatFn);
-    }
-
-    /**
-     * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
-     * messages in an attribute with the specified name. The value of the attribute will be a number
-     * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
-     * time classes, {@link Instant#Instant(long)} can be used to parse this value.
-     *
-     * <p>If the output from this sink is being read by another Beam pipeline, then
-     * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
-     * these timestamps from the appropriate attribute.
-     */
-    public Write<T> timestampLabel(String timestampLabel) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
-    }
-
-    /**
-     * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
-     * published messages in an attribute with the specified name. The value of the attribute is an
-     * opaque string.
-     *
-     * <p>If the the output from this sink is being read by another Beam pipeline, then
-     * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
-     * these unique identifiers from the appropriate attribute.
-     */
-    public Write<T> idLabel(String idLabel) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
-    }
-
-    /**
-     * Returns a new transform that's like this one
-     * but that uses the given {@link Coder} to encode each of
-     * the elements of the input {@link PCollection} into an
-     * output record.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write<T> withCoder(Coder<T> coder) {
-      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
-    }
-
-    /**
-     * Used to write a PubSub message together with PubSub attributes. The user-supplied format
-     * function translates the input type T to a PubsubMessage object, which is used by the sink
-     * to separately set the PubSub message's payload and attributes.
-     */
-    public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
-      return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
-    }
-
-    @Override
-    public PDone expand(PCollection<T> input) {
-      if (topic == null) {
-        throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
-      }
-      switch (input.isBounded()) {
-        case BOUNDED:
-          input.apply(ParDo.of(new PubsubBoundedWriter()));
-          return PDone.in(input.getPipeline());
-        case UNBOUNDED:
-          return input.apply(new PubsubUnboundedSink<T>(
-              FACTORY,
-              NestedValueProvider.of(topic, new TopicPathTranslator()),
-              coder,
-              timestampLabel,
-              idLabel,
-              formatFn,
-              100 /* numShards */));
-      }
-      throw new RuntimeException(); // cases are exhaustive.
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
-    }
-
-    @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
-
-    /**
-     * Returns the PubSub topic being written to.
-     */
-    @Nullable
-    public PubsubTopic getTopic() {
-      return (topic == null) ? null : topic.get();
-    }
-
-    /**
-     * Returns the {@link ValueProvider} for the topic being written to.
-     */
-    @Nullable
-    public ValueProvider<PubsubTopic> getTopicProvider() {
-      return topic;
-    }
-
-    /**
-     * Returns the timestamp label.
-     */
-    @Nullable
-    public String getTimestampLabel() {
-      return timestampLabel;
-    }
-
-    /**
-     * Returns the id label.
-     */
-    @Nullable
-    public String getIdLabel() {
-      return idLabel;
-    }
-
-    /**
-     * Returns the output coder.
-     */
-    @Nullable
-    public Coder<T> getCoder() {
-      return coder;
-    }
-
-    /**
-     * Returns the formatting function used if publishing attributes.
-     */
-    @Nullable
-    public SimpleFunction<T, PubsubMessage> getFormatFn() {
-      return formatFn;
-    }
-
-    /**
-     * Writer to Pubsub which batches messages from bounded collections.
-     *
-     * <p>Public so can be suppressed by runners.
-     */
-    public class PubsubBoundedWriter extends DoFn<T, Void> {
-
-      private static final int MAX_PUBLISH_BATCH_SIZE = 100;
-      private transient List<OutgoingMessage> output;
-      private transient PubsubClient pubsubClient;
-
-      @StartBundle
-      public void startBundle(Context c) throws IOException {
-        this.output = new ArrayList<>();
-        // NOTE: idLabel is ignored.
-        this.pubsubClient =
-            FACTORY.newClient(timestampLabel, null,
-                c.getPipelineOptions().as(PubsubOptions.class));
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws IOException {
-        byte[] payload = null;
-        Map<String, String> attributes = null;
-        if (formatFn != null) {
-          PubsubMessage message = formatFn.apply(c.element());
-          payload = message.getMessage();
-          attributes = message.getAttributeMap();
-        } else {
-          payload = CoderUtils.encodeToByteArray(getCoder(), c.element());
-        }
-        // NOTE: The record id is always null.
-        OutgoingMessage message =
-            new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null);
-        output.add(message);
-
-        if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
-          publish();
-        }
-      }
-
-      @FinishBundle
-      public void finishBundle(Context c) throws IOException {
-        if (!output.isEmpty()) {
-          publish();
-        }
-        output = null;
-        pubsubClient.close();
-        pubsubClient = null;
-      }
-
-      private void publish() throws IOException {
-        int n = pubsubClient.publish(
-            PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
-            output);
-        checkState(n == output.size());
-        output.clear();
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder.delegate(Write.this);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5bcb8c57/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
deleted file mode 100644
index 55605b3..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ /dev/null
@@ -1,494 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.Hashing;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.PubsubClient;
-import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
-import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory;
-import org.apache.beam.sdk.util.PubsubClient.TopicPath;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Duration;
-
-/**
- * A PTransform which streams messages to Pubsub.
- * <ul>
- * <li>The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which
- * publishes as a side effect. (In the future we want to design and switch to a custom
- * {@code UnboundedSink} implementation so as to gain access to system watermark and
- * end-of-pipeline cleanup.)
- * <li>We try to send messages in batches while also limiting send latency.
- * <li>No stats are logged. Rather some counters are used to keep track of elements and batches.
- * <li>Though some background threads are used by the underlying netty system all actual Pubsub
- * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances
- * to execute concurrently and hide latency.
- * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer
- * to dedup messages.
- * </ul>
- */
-public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
-  /**
-   * Default maximum number of messages per publish.
-   */
-  private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
-
-  /**
-   * Default maximum size of a publish batch, in bytes.
-   */
-  private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
-
-  /**
-   * Default longest delay between receiving a message and pushing it to Pubsub.
-   */
-  private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
-
-  /**
-   * Coder for conveying outgoing messages between internal stages.
-   */
-  private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
-    private static final NullableCoder<String> RECORD_ID_CODER =
-        NullableCoder.of(StringUtf8Coder.of());
-    private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
-            NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
-
-    @Override
-    public void encode(
-        OutgoingMessage value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-      ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
-      ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
-      RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
-    }
-
-    @Override
-    public OutgoingMessage decode(
-        InputStream inStream, Context context) throws CoderException, IOException {
-      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
-      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested());
-      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
-      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested());
-      return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
-    }
-  }
-
-  @VisibleForTesting
-  static final Coder<OutgoingMessage> CODER = new OutgoingMessageCoder();
-
-  // ================================================================================
-  // RecordIdMethod
-  // ================================================================================
-
-  /**
-   * Specify how record ids are to be generated.
-   */
-  @VisibleForTesting
-  enum RecordIdMethod {
-    /** Leave null. */
-    NONE,
-    /** Generate randomly. */
-    RANDOM,
-    /** Generate deterministically. For testing only. */
-    DETERMINISTIC
-  }
-
-  // ================================================================================
-  // ShardFn
-  // ================================================================================
-
-  /**
-   * Convert elements to messages and shard them.
-   */
-  private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> {
-    private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
-    private final Coder<T> elementCoder;
-    private final int numShards;
-    private final RecordIdMethod recordIdMethod;
-    private final SimpleFunction<T, PubsubMessage> formatFn;
-
-    ShardFn(Coder<T> elementCoder, int numShards,
-            SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) {
-      this.elementCoder = elementCoder;
-      this.numShards = numShards;
-      this.formatFn = formatFn;
-      this.recordIdMethod = recordIdMethod;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      elementCounter.inc();
-      byte[] elementBytes = null;
-      Map<String, String> attributes = ImmutableMap.<String, String>of();
-      if (formatFn != null) {
-        PubsubIO.PubsubMessage message = formatFn.apply(c.element());
-        elementBytes = message.getMessage();
-        attributes = message.getAttributeMap();
-      } else {
-        elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
-      }
-
-      long timestampMsSinceEpoch = c.timestamp().getMillis();
-      @Nullable String recordId = null;
-      switch (recordIdMethod) {
-        case NONE:
-          break;
-        case DETERMINISTIC:
-          recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
-          break;
-        case RANDOM:
-          // Since these elements go through a GroupByKey, any  failures while sending to
-          // Pubsub will be retried without falling back and generating a new record id.
-          // Thus even though we may send the same message to Pubsub twice, it is guaranteed
-          // to have the same record id.
-          recordId = UUID.randomUUID().toString();
-          break;
-      }
-      c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
-                     new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch,
-                             recordId)));
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("numShards", numShards));
-    }
-  }
-
-  // ================================================================================
-  // WriterFn
-  // ================================================================================
-
-  /**
-   * Publish messages to Pubsub in batches.
-   */
-  private static class WriterFn
-      extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> {
-    private final PubsubClientFactory pubsubFactory;
-    private final ValueProvider<TopicPath> topic;
-    private final String timestampLabel;
-    private final String idLabel;
-    private final int publishBatchSize;
-    private final int publishBatchBytes;
-
-    /**
-     * Client on which to talk to Pubsub. Null until created by {@link #startBundle}.
-     */
-    @Nullable
-    private transient PubsubClient pubsubClient;
-
-    private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
-    private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements");
-    private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes");
-
-    WriterFn(
-        PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
-        String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) {
-      this.pubsubFactory = pubsubFactory;
-      this.topic = topic;
-      this.timestampLabel = timestampLabel;
-      this.idLabel = idLabel;
-      this.publishBatchSize = publishBatchSize;
-      this.publishBatchBytes = publishBatchBytes;
-    }
-
-    /**
-     * BLOCKING
-     * Send {@code messages} as a batch to Pubsub.
-     */
-    private void publishBatch(List<OutgoingMessage> messages, int bytes)
-        throws IOException {
-      int n = pubsubClient.publish(topic.get(), messages);
-      checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful",
-                 messages.size(), n);
-      batchCounter.inc();
-      elementCounter.inc(messages.size());
-      byteCounter.inc(bytes);
-    }
-
-    @StartBundle
-    public void startBundle(Context c) throws Exception {
-      checkState(pubsubClient == null, "startBundle invoked without prior finishBundle");
-      pubsubClient = pubsubFactory.newClient(timestampLabel, idLabel,
-                                             c.getPipelineOptions().as(PubsubOptions.class));
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      List<OutgoingMessage> pubsubMessages = new ArrayList<>(publishBatchSize);
-      int bytes = 0;
-      for (OutgoingMessage message : c.element().getValue()) {
-        if (!pubsubMessages.isEmpty()
-            && bytes + message.elementBytes.length > publishBatchBytes) {
-          // Break large (in bytes) batches into smaller.
-          // (We've already broken by batch size using the trigger below, though that may
-          // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
-          // the hard limit from Pubsub is by bytes rather than number of messages.)
-          // BLOCKS until published.
-          publishBatch(pubsubMessages, bytes);
-          pubsubMessages.clear();
-          bytes = 0;
-        }
-        pubsubMessages.add(message);
-        bytes += message.elementBytes.length;
-      }
-      if (!pubsubMessages.isEmpty()) {
-        // BLOCKS until published.
-        publishBatch(pubsubMessages, bytes);
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      pubsubClient.close();
-      pubsubClient = null;
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      super.populateDisplayData(builder);
-        String topicString =
-            topic == null ? null
-            : topic.isAccessible() ? topic.get().getPath()
-            : topic.toString();
-      builder.add(DisplayData.item("topic", topicString));
-      builder.add(DisplayData.item("transport", pubsubFactory.getKind()));
-      builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel));
-      builder.addIfNotNull(DisplayData.item("idLabel", idLabel));
-    }
-  }
-
-  // ================================================================================
-  // PubsubUnboundedSink
-  // ================================================================================
-
-  /**
-   * Which factory to use for creating Pubsub transport.
-   */
-  private final PubsubClientFactory pubsubFactory;
-
-  /**
-   * Pubsub topic to publish to.
-   */
-  private final ValueProvider<TopicPath> topic;
-
-  /**
-   * Coder for elements. It is the responsibility of the underlying Pubsub transport to
-   * re-encode element bytes if necessary, eg as Base64 strings.
-   */
-  private final Coder<T> elementCoder;
-
-  /**
-   * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use
-   * Pubsub message publish timestamp instead.
-   */
-  @Nullable
-  private final String timestampLabel;
-
-  /**
-   * Pubsub metadata field holding id for each element, or {@literal null} if need to generate
-   * a unique id ourselves.
-   */
-  @Nullable
-  private final String idLabel;
-
-  /**
-   * Number of 'shards' to use so that latency in Pubsub publish can be hidden. Generally this
-   * should be a small multiple of the number of available cores. Too smoll a number results
-   * in too much time lost to blocking Pubsub calls. To large a number results in too many
-   * single-element batches being sent to Pubsub with high per-batch overhead.
-   */
-  private final int numShards;
-
-  /**
-   * Maximum number of messages per publish.
-   */
-  private final int publishBatchSize;
-
-  /**
-   * Maximum size of a publish batch, in bytes.
-   */
-  private final int publishBatchBytes;
-
-  /**
-   * Longest delay between receiving a message and pushing it to Pubsub.
-   */
-  private final Duration maxLatency;
-
-  /**
-   * How record ids should be generated for each record (if {@link #idLabel} is non-{@literal
-   * null}).
-   */
-  private final RecordIdMethod recordIdMethod;
-
-  /**
-   * In order to publish attributes, a formatting function is used to format the output into
-   * a {@link PubsubIO.PubsubMessage}.
-   */
-  private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
-
-  @VisibleForTesting
-  PubsubUnboundedSink(
-      PubsubClientFactory pubsubFactory,
-      ValueProvider<TopicPath> topic,
-      Coder<T> elementCoder,
-      String timestampLabel,
-      String idLabel,
-      int numShards,
-      int publishBatchSize,
-      int publishBatchBytes,
-      Duration maxLatency,
-      SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
-      RecordIdMethod recordIdMethod) {
-    this.pubsubFactory = pubsubFactory;
-    this.topic = topic;
-    this.elementCoder = elementCoder;
-    this.timestampLabel = timestampLabel;
-    this.idLabel = idLabel;
-    this.numShards = numShards;
-    this.publishBatchSize = publishBatchSize;
-    this.publishBatchBytes = publishBatchBytes;
-    this.maxLatency = maxLatency;
-    this.formatFn = formatFn;
-    this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
-  }
-
-  public PubsubUnboundedSink(
-      PubsubClientFactory pubsubFactory,
-      ValueProvider<TopicPath> topic,
-      Coder<T> elementCoder,
-      String timestampLabel,
-      String idLabel,
-      SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
-      int numShards) {
-    this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
-         DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
-         formatFn, RecordIdMethod.RANDOM);
-  }
-
-  /**
-   * Get the topic being written to.
-   */
-  public TopicPath getTopic() {
-    return topic.get();
-  }
-
-  /**
-   * Get the {@link ValueProvider} for the topic being written to.
-   */
-  public ValueProvider<TopicPath> getTopicProvider() {
-    return topic;
-  }
-
-  /**
-   * Get the timestamp label.
-   */
-  @Nullable
-  public String getTimestampLabel() {
-    return timestampLabel;
-  }
-
-  /**
-   * Get the id label.
-   */
-  @Nullable
-  public String getIdLabel() {
-    return idLabel;
-  }
-
-  /**
-   * Get the format function used for PubSub attributes.
-   */
-  @Nullable
-  public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() {
-    return formatFn;
-  }
-
-  /**
-   * Get the Coder used to encode output elements.
-   */
-  public Coder<T> getElementCoder() {
-    return elementCoder;
-  }
-
-  @Override
-  public PDone expand(PCollection<T> input) {
-    input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows())
-        .triggering(
-            Repeatedly.forever(
-                AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize),
-                    AfterProcessingTime.pastFirstElementInPane()
-                    .plusDelayOf(maxLatency))))
-            .discardingFiredPanes())
-         .apply("PubsubUnboundedSink.Shard",
-             ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod)))
-         .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
-         .apply(GroupByKey.<Integer, OutgoingMessage>create())
-         .apply("PubsubUnboundedSink.Writer",
-             ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel,
-                 publishBatchSize, publishBatchBytes)));
-    return PDone.in(input.getPipeline());
-  }
-}


Mime
View raw message