beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/3] beam git commit: Add PubSub attributes support to PubsubIO.
Date Fri, 20 Jan 2017 22:58:25 GMT
Add PubSub attributes support to PubsubIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f032facb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f032facb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f032facb

Branch: refs/heads/master
Commit: f032facb4889f9183f8c6bfeda9fee9c0e4b7979
Parents: a681037
Author: Reuven Lax <relax@relax-macbookpro.roam.corp.google.com>
Authored: Sun Nov 13 20:29:31 2016 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Jan 20 14:57:56 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/complete/game/GameStats.java  |   5 +-
 .../examples/complete/game/LeaderBoard.java     |   5 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  30 +-
 .../java/org/apache/beam/sdk/io/PubsubIO.java   | 783 +++++++++----------
 .../apache/beam/sdk/io/PubsubUnboundedSink.java |  72 +-
 .../beam/sdk/io/PubsubUnboundedSource.java      |  67 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |   2 +-
 .../org/apache/beam/sdk/util/PropertyNames.java |   1 +
 .../org/apache/beam/sdk/util/PubsubClient.java  |  28 +-
 .../apache/beam/sdk/util/PubsubGrpcClient.java  |   6 +-
 .../apache/beam/sdk/util/PubsubJsonClient.java  |   4 +-
 .../apache/beam/sdk/util/PubsubTestClient.java  |   6 +-
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |  41 +-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  41 +-
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  10 +-
 .../beam/sdk/util/PubsubGrpcClientTest.java     |   8 +-
 .../beam/sdk/util/PubsubJsonClientTest.java     |   3 +-
 .../beam/sdk/util/PubsubTestClientTest.java     |   4 +-
 18 files changed, 643 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 74f1b30..c880061 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,6 +24,7 @@ import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -252,7 +253,9 @@ public class GameStats extends LeaderBoard {
 
     // Read Events from Pub/Sub using custom timestamps
     PCollection<GameActionInfo> rawEvents = pipeline
-        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+        .apply(PubsubIO.<String>read()
+            .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+            .withCoder(StringUtf8Coder.of()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     // Extract username/score pairs from the event stream

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 519bd5f..35b586b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,6 +27,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
 import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
@@ -190,7 +191,9 @@ public class LeaderBoard extends HourlyTeamScore {
     // Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
     // data elements, and parse the data.
     PCollection<GameActionInfo> gameEvents = pipeline
-        .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
+        .apply(PubsubIO.<String>read()
+            .timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())
+            .withCoder(StringUtf8Coder.of()))
         .apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
 
     gameEvents.apply("CalculateTeamScores",

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d21da59..5fdbc83 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,8 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
+import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -99,6 +101,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.PubsubIO;
@@ -336,8 +339,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.
-      builder.put(PubsubIO.Read.Bound.PubsubBoundedReader.class, UnsupportedIO.class);
-      builder.put(PubsubIO.Write.Bound.PubsubBoundedWriter.class, UnsupportedIO.class);
+      builder.put(PubsubIO.Read.PubsubBoundedReader.class, UnsupportedIO.class);
+      builder.put(PubsubIO.Write.PubsubBoundedWriter.class, UnsupportedIO.class);
       if (options.getExperiments() == null
           || !options.getExperiments().contains("enable_custom_pubsub_source")) {
         builder.put(PubsubUnboundedSource.class, StreamingPubsubIORead.class);
@@ -2149,6 +2152,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       if (overriddenTransform.getIdLabel() != null) {
         stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
       }
+      if (overriddenTransform.getWithAttributesParseFn() != null) {
+        stepContext.addInput(
+            PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
+            byteArrayToJsonString(
+                serializeToByteArray(overriddenTransform.getWithAttributesParseFn())));
+      }
       stepContext.addOutput(context.getOutput(transform));
     }
   }
@@ -2218,8 +2227,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       if (overriddenTransform.getIdLabel() != null) {
         stepContext.addInput(PropertyNames.PUBSUB_ID_LABEL, overriddenTransform.getIdLabel());
       }
-      stepContext.addEncodingInput(
-          WindowedValue.getValueOnlyCoder(overriddenTransform.getElementCoder()));
+      if (overriddenTransform.getFormatFn() != null) {
+        stepContext.addInput(
+            PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN,
+            byteArrayToJsonString(serializeToByteArray(overriddenTransform.getFormatFn())));
+        // No coder is needed in this case since the formatFn formats directly into a byte[],
+        // however the Dataflow backend require a coder to be set.
+        stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(VoidCoder.of()));
+      } else if (overriddenTransform.getElementCoder() != null) {
+        stepContext.addEncodingInput(WindowedValue.getValueOnlyCoder(
+            overriddenTransform.getElementCoder()));
+      }
       stepContext.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
     }
   }
@@ -2738,7 +2756,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
     public UnsupportedIO(DataflowRunner runner,
-                         PubsubIO.Read.Bound<?>.PubsubBoundedReader doFn) {
+                         PubsubIO.Read<?>.PubsubBoundedReader doFn) {
       this.doFn = doFn;
     }
 
@@ -2747,7 +2765,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
     public UnsupportedIO(DataflowRunner runner,
-                         PubsubIO.Write.Bound<?>.PubsubBoundedWriter doFn) {
+                         PubsubIO.Write<?>.PubsubBoundedWriter doFn) {
       this.doFn = doFn;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 9b5edd1..2802871 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Strings;
@@ -24,11 +25,11 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -74,9 +76,6 @@ public class PubsubIO {
   /** Factory for creating pubsub client to manage transport. */
   private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
 
-  /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
-  public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
-
   /**
    * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
    * IDs must start with a letter and may not end with a dash.
@@ -154,6 +153,43 @@ public class PubsubIO {
   }
 
   /**
+   * Class representing a Pub/Sub message. Each message contains a single message payload and
+   * a map of attached attributes.
+   */
+  public static class PubsubMessage {
+    private byte[] message;
+    private Map<String, String> attributes;
+
+    public PubsubMessage(byte[] message, Map<String, String> attributes) {
+      this.message = message;
+      this.attributes = attributes;
+    }
+
+    /**
+     * Returns the main PubSub message.
+     */
+    public byte[] getMessage() {
+      return message;
+    }
+
+    /**
+     * Returns the given attribute value. If not such attribute exists, returns null.
+     */
+    @Nullable
+    public String getAttribute(String attribute) {
+      checkNotNull(attribute, "attribute");
+      return attributes.get(attribute);
+    }
+
+    /**
+     * Returns the full map of attributes. This is an unmodifiable map.
+     */
+    public Map<String, String> getAttributeMap() {
+      return attributes;
+    }
+  }
+
+  /**
    * Class representing a Cloud Pub/Sub Subscription.
    */
   public static class PubsubSubscription implements Serializable {
@@ -417,6 +453,14 @@ public class PubsubIO {
     }
   }
 
+  public static <T> Read<T> read() {
+    return new Read<>();
+  }
+
+  public static <T> Write<T> write() {
+    return new Write<>();
+  }
+
   /**
    * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and
    * returns a {@link PCollection} of {@link String Strings} containing the items from
@@ -424,140 +468,10 @@ public class PubsubIO {
    *
    * <p>When running with a {@link PipelineRunner} that only supports bounded
    * {@link PCollection PCollections}, only a bounded portion of the input Pub/Sub stream
-   * can be processed. As such, either {@link Bound#maxNumRecords(int)} or
-   * {@link Bound#maxReadTime(Duration)} must be set.
+   * can be processed. As such, either {@link PubsubIO.Read#maxNumRecords(int)} or
+   * {@link PubsubIO.Read#maxReadTime(Duration)} must be set.
    */
-  public static class Read {
-
-    /**
-     * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
-     * with {@link #subscription(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
-     * of the {@code topic} string.
-     *
-     * <p>The Beam runner will start reading data published on this topic from the time the pipeline
-     * is started. Any data published on the topic before the pipeline is started will not be read
-     * by the runner.
-     */
-    public static Bound<String> topic(String topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic));
-    }
-
-    /**
-     * Like {@code topic()} but with a {@link ValueProvider}.
-     */
-    public static Bound<String> topic(ValueProvider<String> topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
-    }
-
-    /**
-     * Creates and returns a transform for reading from a specific Cloud Pub/Sub subscription.
-     * Mutually exclusive with {@link #topic(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-     * of the {@code subscription} string.
-     */
-    public static Bound<String> subscription(String subscription) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(StaticValueProvider.of(subscription));
-    }
-
-    /**
-     * Like {@code topic()} but with a {@link ValueProvider}.
-     */
-    public static Bound<String> subscription(ValueProvider<String> subscription) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription);
-    }
-
-    /**
-     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-     * parameter specifies the name of the attribute that contains the timestamp.
-     *
-     * <p>The timestamp value is expected to be represented in the attribute as either:
-     *
-     * <ul>
-     * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
-     * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
-     * value for this attribute.
-     * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
-     * sub-second component of the timestamp is optional, and digits beyond the first three
-     * (i.e., time units smaller than milliseconds) will be ignored.
-     * </ul>
-     *
-     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
-     * the first time it sees each record. All windowing will be done relative to these timestamps.
-     *
-     * <p>By default, windows are emitted based on an estimate of when this source is likely
-     * done producing data for a given timestamp (referred to as the Watermark; see
-     * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
-     * specified with the windowing strategy &ndash; by default it will be output immediately.
-     *
-     * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
-     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
-     *
-     * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
-     */
-    public static Bound<String> timestampLabel(String timestampLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-     * parameter specifies the attribute name. The value of the attribute can be any string
-     * that uniquely identifies this record.
-     *
-     * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
-     * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
-     * be delivered, and deduplication of the stream will be strictly best effort.
-     */
-    public static Bound<String> idLabel(String idLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub that uses the given
-     * {@link Coder} to decode Pub/Sub messages into a value of type {@code T}.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which just
-     * returns the text lines as Java strings.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting PCollection.
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
-     * bounded source.
-     */
-    public static Bound<String> maxNumRecords(int maxNumRecords) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).maxNumRecords(maxNumRecords);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * duration during which records will be read.  The transform produces a <i>bounded</i>
-     * {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a bounded
-     * source.
-     */
-    public static Bound<String> maxReadTime(Duration maxReadTime) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).maxReadTime(maxReadTime);
-    }
-
-    /**
-     * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
-     * a unbounded {@link PCollection} containing the items from the stream.
-     */
-    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
+   public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
       /** The Cloud Pub/Sub topic to read from. */
       @Nullable private final ValueProvider<PubsubTopic> topic;
 
@@ -579,13 +493,17 @@ public class PubsubIO {
       /** Stop after reading for this much time. */
       @Nullable private final Duration maxReadTime;
 
-      private Bound(Coder<T> coder) {
-        this(null, null, null, null, coder, null, 0, null);
+      /** User function for parsing PubsubMessage object. */
+      SimpleFunction<PubsubMessage, T> parseFn;
+
+      private Read() {
+        this(null, null, null, null, null, null, 0, null, null);
       }
 
-      private Bound(String name, ValueProvider<PubsubSubscription> subscription,
+      private Read(String name, ValueProvider<PubsubSubscription> subscription,
           ValueProvider<PubsubTopic> topic, String timestampLabel, Coder<T> coder,
-          String idLabel, int maxNumRecords, Duration maxReadTime) {
+          String idLabel, int maxNumRecords, Duration maxReadTime,
+          SimpleFunction<PubsubMessage, T> parseFn) {
         super(name);
         this.subscription = subscription;
         this.topic = topic;
@@ -594,6 +512,7 @@ public class PubsubIO {
         this.idLabel = idLabel;
         this.maxNumRecords = maxNumRecords;
         this.maxReadTime = maxReadTime;
+        this.parseFn = parseFn;
       }
 
       /**
@@ -609,104 +528,152 @@ public class PubsubIO {
        *
        * <p>Does not modify this object.
        */
-      public Bound<T> subscription(String subscription) {
+      public Read<T> subscription(String subscription) {
         return subscription(StaticValueProvider.of(subscription));
       }
 
       /**
        * Like {@code subscription()} but with a {@link ValueProvider}.
        */
-      public Bound<T> subscription(ValueProvider<String> subscription) {
+      public Read<T> subscription(ValueProvider<String> subscription) {
         if (subscription.isAccessible()) {
           // Validate.
           PubsubSubscription.fromPath(subscription.get());
         }
-        return new Bound<>(name,
+        return new Read<>(name,
             NestedValueProvider.of(subscription, new SubscriptionTranslator()),
-            topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+            topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
       }
 
       /**
-       * Returns a transform that's like this one but that reads from the specified topic.
+       * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
+       * with {@link #subscription(String)}.
        *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the
-       * format of the {@code topic} string.
+       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
+       * of the {@code topic} string.
        *
-       * <p>Does not modify this object.
+       * <p>The Beam runner will start reading data published on this topic from the time the
+       * pipeline is started. Any data published on the topic before the pipeline is started will
+       * not be read by the runner.
        */
-      public Bound<T> topic(String topic) {
+      public Read<T> topic(String topic) {
         return topic(StaticValueProvider.of(topic));
       }
 
       /**
        * Like {@code topic()} but with a {@link ValueProvider}.
        */
-      public Bound<T> topic(ValueProvider<String> topic) {
+      public Read<T> topic(ValueProvider<String> topic) {
         if (topic.isAccessible()) {
           // Validate.
-          PubsubTopic.fromPath(topic.get());
+            PubsubTopic.fromPath(topic.get());
         }
-        return new Bound<>(name, subscription,
+        return new Read<>(name, subscription,
             NestedValueProvider.of(topic, new TopicTranslator()),
-            timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+            timestampLabel, coder, idLabel, maxNumRecords, maxReadTime, parseFn);
       }
 
       /**
-       * Returns a transform that's like this one but that reads message timestamps
-       * from the given message attribute. See {@link PubsubIO.Read#timestampLabel(String)} for
-       * more details on the format of the timestamp attribute.
+       * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
+       * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
+       * parameter specifies the name of the attribute that contains the timestamp.
        *
-       * <p>Does not modify this object.
+       * <p>The timestamp value is expected to be represented in the attribute as either:
+       *
+       * <ul>
+       * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
+       * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
+       * value for this attribute.
+       * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
+       * sub-second component of the timestamp is optional, and digits beyond the first three
+       * (i.e., time units smaller than milliseconds) will be ignored.
+       * </ul>
+       *
+       * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
+       * the first time it sees each record. All windowing will be done relative to these
+       * timestamps.
+       *
+       * <p>By default, windows are emitted based on an estimate of when this source is likely
+       * done producing data for a given timestamp (referred to as the Watermark; see
+       * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
+       * specified with the windowing strategy &ndash; by default it will be output immediately.
+       *
+       * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
+       * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
+       *
+       * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
        */
-      public Bound<T> timestampLabel(String timestampLabel) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+      public Read<T> timestampLabel(String timestampLabel) {
+        return new Read<>(
+            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+            parseFn);
       }
 
-      /**
-       * Returns a transform that's like this one but that reads unique message IDs
-       * from the given message attribute. See {@link PubsubIO.Read#idLabel(String)} for more
-       * details on the format of the ID attribute.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> idLabel(String idLabel) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+     /**
+      * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
+      * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
+      * parameter specifies the attribute name. The value of the attribute can be any string
+      * that uniquely identifies this record.
+      *
+      * <p>Pub/Sub cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream.
+      * If {@code idLabel} is not provided, Beam cannot guarantee that no duplicate data will
+      * be delivered, and deduplication of the stream will be strictly best effort.
+      */
+      public Read<T> idLabel(String idLabel) {
+        return new Read<>(
+            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+            parseFn);
       }
 
       /**
        * Returns a transform that's like this one but that uses the given
-       * {@link Coder} to decode each record into a value of type {@code X}.
+       * {@link Coder} to decode each record into a value of type {@code T}.
        *
        * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements, and the
-       * elements of the resulting PCollection.
        */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+      public Read<T> withCoder(Coder<T> coder) {
+        return new Read<>(
+            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+            parseFn);
       }
 
       /**
-       * Returns a transform that's like this one but will only read up to the specified
-       * maximum number of records from Cloud Pub/Sub. The transform produces a <i>bounded</i>
-       * {@link PCollection}. See {@link PubsubIO.Read#maxNumRecords(int)} for more details.
+       * Causes the source to return a PubsubMessage that includes Pubsub attributes.
+       * The user must supply a parsing function to transform the PubsubMessage into an output type.
+       * A Coder for the output type T must be registered or set on the output via
+       * {@link PCollection#setCoder(Coder)}.
        */
-      public Bound<T> maxNumRecords(int maxNumRecords) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+      public Read<T> withAttributes(SimpleFunction<PubsubMessage, T> parseFn) {
+        return new Read<T>(
+            name, subscription, topic, timestampLabel, coder, idLabel,
+            maxNumRecords, maxReadTime, parseFn);
       }
 
-      /**
-       * Returns a transform that's like this one but will only read during the specified
-       * duration from Cloud Pub/Sub. The transform produces a <i>bounded</i> {@link PCollection}.
-       * See {@link PubsubIO.Read#maxReadTime(Duration)} for more details.
-       */
-      public Bound<T> maxReadTime(Duration maxReadTime) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
+     /**
+      * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+      * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
+      *
+      * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
+      * bounded source.
+      */
+      public Read<T> maxNumRecords(int maxNumRecords) {
+        return new Read<>(
+            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+            parseFn);
+      }
+
+     /**
+      * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
+      * duration during which records will be read.  The transform produces a <i>bounded</i>
+      * {@link PCollection}.
+      *
+      * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a
+      * bounded source.
+      */
+      public Read<T> maxReadTime(Duration maxReadTime) {
+        return new Read<>(
+            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime,
+            parseFn);
       }
 
       @Override
@@ -719,6 +686,10 @@ public class PubsubIO {
           throw new IllegalStateException("Can't set both the topic and the subscription for "
               + "a PubsubIO.Read transform");
         }
+        if (coder == null)  {
+          throw new IllegalStateException("PubsubIO.Read requires that a coder be set using "
+              + "the withCoder method.");
+        }
 
         boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
 
@@ -739,7 +710,7 @@ public class PubsubIO {
           return input.getPipeline().begin()
                       .apply(new PubsubUnboundedSource<T>(
                           FACTORY, projectPath, topicPath, subscriptionPath,
-                          coder, timestampLabel, idLabel));
+                          coder, timestampLabel, idLabel, parseFn));
         }
       }
 
@@ -767,43 +738,78 @@ public class PubsubIO {
         return coder;
       }
 
+     /**
+      * Get the topic being read from.
+      */
       public PubsubTopic getTopic() {
         return topic == null ? null : topic.get();
       }
 
+     /**
+      * Get the {@link ValueProvider} for the topic being read from.
+      */
       public ValueProvider<PubsubTopic> getTopicProvider() {
         return topic;
       }
 
+     /**
+      * Get the subscription being read from.
+      */
       public PubsubSubscription getSubscription() {
         return subscription == null ? null : subscription.get();
       }
 
+     /**
+      * Get the {@link ValueProvider} for the subscription being read from.
+      */
       public ValueProvider<PubsubSubscription> getSubscriptionProvider() {
         return subscription;
       }
 
+     /**
+      * Get the timestamp label.
+      */
       public String getTimestampLabel() {
         return timestampLabel;
       }
 
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
+     /**
+      * Get the id label.
+      */
       public String getIdLabel() {
         return idLabel;
       }
 
+
+     /**
+      * Get the {@link Coder} used for the transform's output.
+      */
+      public Coder<T> getCoder() {
+      return coder;
+    }
+
+     /**
+      * Get the maximum number of records to read.
+      */
       public int getMaxNumRecords() {
         return maxNumRecords;
       }
 
+     /**
+      * Get the maximum read time.
+      */
       public Duration getMaxReadTime() {
         return maxReadTime;
       }
 
       /**
+       * Get the parse function used for PubSub attributes.
+       */
+      public SimpleFunction<PubsubMessage, T> getPubSubMessageParseFn() {
+          return parseFn;
+        }
+
+      /**
        * Default reader when Pubsub subscription has some form of upper bound.
        *
        * <p>TODO: Consider replacing with BoundedReadFromUnboundedSource on top
@@ -891,51 +897,79 @@ public class PubsubIO {
             }
 
             for (IncomingMessage message : messages) {
-              c.outputWithTimestamp(
-                  CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes),
-                  new Instant(message.timestampMsSinceEpoch));
+              T element = null;
+              if (parseFn != null) {
+                element = parseFn.apply(new PubsubMessage(
+                        message.elementBytes, message.attributes));
+              } else {
+                element = CoderUtils.decodeFromByteArray(getCoder(), message.elementBytes);
+              }
+              c.outputWithTimestamp(element, new Instant(message.timestampMsSinceEpoch));
             }
           }
         }
 
         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
-          builder.delegate(Bound.this);
+          builder.delegate(Read.this);
         }
       }
     }
 
-    /** Disallow construction of utility class. */
-    private Read() {}
-  }
-
 
   /////////////////////////////////////////////////////////////////////////////
 
   /** Disallow construction of utility class. */
   private PubsubIO() {}
 
+
   /**
-   * A {@link PTransform} that continuously writes a
-   * {@link PCollection} of {@link String Strings} to a Cloud Pub/Sub stream.
+   * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
+   * to a Cloud Pub/Sub stream.
    */
-  // TODO: Support non-String encodings.
-  public static class Write {
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    /** The Cloud Pub/Sub topic to publish to. */
+    @Nullable private final ValueProvider<PubsubTopic> topic;
+    /** The name of the message attribute to publish message timestamps in. */
+    @Nullable private final String timestampLabel;
+    /** The name of the message attribute to publish unique message IDs in. */
+    @Nullable private final String idLabel;
+    /** The input type Coder. */
+    private final Coder<T> coder;
+    /** The format function for input PubsubMessage objects. */
+    SimpleFunction<T, PubsubMessage> formatFn;
+
+    private Write() {
+      this(null, null, null, null, null, null);
+    }
+
+    private Write(
+        String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
+        String idLabel, Coder<T> coder, SimpleFunction<T, PubsubMessage> formatFn) {
+      super(name);
+      this.topic = topic;
+      this.timestampLabel = timestampLabel;
+      this.idLabel = idLabel;
+      this.coder = coder;
+      this.formatFn = formatFn;
+    }
+
     /**
      * Creates a transform that publishes to the specified topic.
      *
      * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
      * {@code topic} string.
      */
-    public static Bound<String> topic(String topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(StaticValueProvider.of(topic));
+    public Write<T> topic(String topic) {
+      return topic(StaticValueProvider.of(topic));
     }
 
     /**
      * Like {@code topic()} but with a {@link ValueProvider}.
      */
-    public static Bound<String> topic(ValueProvider<String> topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
+    public Write<T> topic(ValueProvider<String> topic) {
+      return new Write<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
+          timestampLabel, idLabel, coder, formatFn);
     }
 
     /**
@@ -948,8 +982,8 @@ public class PubsubIO {
      * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
      * these timestamps from the appropriate attribute.
      */
-    public static Bound<String> timestampLabel(String timestampLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
+    public Write<T> timestampLabel(String timestampLabel) {
+      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
     }
 
     /**
@@ -961,221 +995,172 @@ public class PubsubIO {
      * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
      * these unique identifiers from the appropriate attribute.
      */
-    public static Bound<String> idLabel(String idLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
+    public Write<T> idLabel(String idLabel) {
+      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
     }
 
     /**
-     * Creates a transform that  uses the given {@link Coder} to encode each of the
-     * elements of the input collection into an output message.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which writes input Java strings directly as
-     * records.
+     * Returns a new transform that's like this one
+     * but that uses the given {@link Coder} to encode each of
+     * the elements of the input {@link PCollection} into an
+     * output record.
      *
-     * @param <T> the type of the elements of the input PCollection
+     * <p>Does not modify this object.
      */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
+    public Write<T> withCoder(Coder<T> coder) {
+      return new Write<>(name, topic, timestampLabel, idLabel, coder, formatFn);
     }
 
     /**
-     * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
-     * to a Cloud Pub/Sub stream.
+     * Used to write a PubSub message together with PubSub attributes. The user-supplied format
+     * function translates the input type T to a PubsubMessage object, which is used by the sink
+     * to separately set the PubSub message's payload and attributes.
      */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      /** The Cloud Pub/Sub topic to publish to. */
-      @Nullable private final ValueProvider<PubsubTopic> topic;
-      /** The name of the message attribute to publish message timestamps in. */
-      @Nullable private final String timestampLabel;
-      /** The name of the message attribute to publish unique message IDs in. */
-      @Nullable private final String idLabel;
-      private final Coder<T> coder;
-
-      private Bound(Coder<T> coder) {
-        this(null, null, null, null, coder);
-      }
-
-      private Bound(
-          String name, ValueProvider<PubsubTopic> topic, String timestampLabel,
-          String idLabel, Coder<T> coder) {
-        super(name);
-        this.topic = topic;
-        this.timestampLabel = timestampLabel;
-        this.idLabel = idLabel;
-        this.coder = coder;
-      }
-
-      /**
-       * Returns a new transform that's like this one but that writes to the specified
-       * topic.
-       *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-       * {@code topic} string.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> topic(String topic) {
-        return topic(StaticValueProvider.of(topic));
-      }
-
-      /**
-       * Like {@code topic()} but with a {@link ValueProvider}.
-       */
-      public Bound<T> topic(ValueProvider<String> topic) {
-        return new Bound<>(name, NestedValueProvider.of(topic, new TopicTranslator()),
-            timestampLabel, idLabel, coder);
-      }
+    public Write<T> withAttributes(SimpleFunction<T, PubsubMessage> formatFn) {
+        return new Write<T>(name, topic, timestampLabel, idLabel, coder, formatFn);
+    }
 
-      /**
-       * Returns a new transform that's like this one but that publishes record timestamps
-       * to a message attribute with the specified name. See
-       * {@link PubsubIO.Write#timestampLabel(String)} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> timestampLabel(String timestampLabel) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (topic == null) {
+        throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
       }
-
-      /**
-       * Returns a new transform that's like this one but that publishes unique record IDs
-       * to a message attribute with the specified name. See {@link PubsubIO.Write#idLabel(String)}
-       * for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> idLabel(String idLabel) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
+      switch (input.isBounded()) {
+        case BOUNDED:
+          input.apply(ParDo.of(new PubsubBoundedWriter()));
+          return PDone.in(input.getPipeline());
+        case UNBOUNDED:
+          return input.apply(new PubsubUnboundedSink<T>(
+              FACTORY,
+              NestedValueProvider.of(topic, new TopicPathTranslator()),
+              coder,
+              timestampLabel,
+              idLabel,
+              formatFn,
+              100 /* numShards */));
       }
+      throw new RuntimeException(); // cases are exhaustive.
+    }
 
-      /**
-       * Returns a new transform that's like this one
-       * but that uses the given {@link Coder} to encode each of
-       * the elements of the input {@link PCollection} into an
-       * output record.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input {@link PCollection}
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
+    }
 
-      @Override
-      public PDone expand(PCollection<T> input) {
-        if (topic == null) {
-          throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
-        }
-        switch (input.isBounded()) {
-          case BOUNDED:
-            input.apply(ParDo.of(new PubsubBoundedWriter()));
-            return PDone.in(input.getPipeline());
-          case UNBOUNDED:
-            return input.apply(new PubsubUnboundedSink<T>(
-                FACTORY,
-                NestedValueProvider.of(topic, new TopicPathTranslator()),
-                coder,
-                timestampLabel,
-                idLabel,
-                100 /* numShards */));
-        }
-        throw new RuntimeException(); // cases are exhaustive.
-      }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        populateCommonDisplayData(builder, timestampLabel, idLabel, topic);
-      }
+    /**
+     * Returns the PubSub topic being read from.
+     */
+    public PubsubTopic getTopic() {
+      return topic.get();
+    }
 
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
+    /**
+     * Returns the {@link ValueProvider} for the topic being read from.
+     */
+    public ValueProvider<PubsubTopic> getTopicProvider() {
+      return topic;
+    }
 
-      public PubsubTopic getTopic() {
-        return topic.get();
-      }
+    /**
+     * Returns the timestamp label.
+     */
+    public String getTimestampLabel() {
+      return timestampLabel;
+    }
 
-      public ValueProvider<PubsubTopic> getTopicProvider() {
-        return topic;
-      }
+    /**
+     * Returns the id label.
+     */
+    public String getIdLabel() {
+      return idLabel;
+    }
 
-      public String getTimestampLabel() {
-        return timestampLabel;
-      }
+    /**
+     * Returns the output coder.
+     */
+    public Coder<T> getCoder() {
+      return coder;
+    }
 
-      public String getIdLabel() {
-        return idLabel;
-      }
+    /**
+     * Returns the formatting function used if publishing attributes.
+     */
+    public SimpleFunction<T, PubsubMessage> getFormatFn() {
+        return formatFn;
+    }
 
-      public Coder<T> getCoder() {
-        return coder;
+    /**
+     * Writer to Pubsub which batches messages from bounded collections.
+     *
+     * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
+     * service in streaming mode.
+     *
+     * <p>Public so can be suppressed by runners.
+     */
+    public class PubsubBoundedWriter extends DoFn<T, Void> {
+      private static final int MAX_PUBLISH_BATCH_SIZE = 100;
+      private transient List<OutgoingMessage> output;
+      private transient PubsubClient pubsubClient;
+
+      @StartBundle
+      public void startBundle(Context c) throws IOException {
+        this.output = new ArrayList<>();
+        // NOTE: idLabel is ignored.
+        this.pubsubClient =
+            FACTORY.newClient(timestampLabel, null,
+                              c.getPipelineOptions().as(PubsubOptions.class));
       }
 
-      /**
-       * Writer to Pubsub which batches messages from bounded collections.
-       *
-       * <p>NOTE: This is not the implementation used when running on the Google Cloud Dataflow
-       * service in streaming mode.
-       *
-       * <p>Public so can be suppressed by runners.
-       */
-      public class PubsubBoundedWriter extends DoFn<T, Void> {
-        private static final int MAX_PUBLISH_BATCH_SIZE = 100;
-        private transient List<OutgoingMessage> output;
-        private transient PubsubClient pubsubClient;
-
-        @StartBundle
-        public void startBundle(Context c) throws IOException {
-          this.output = new ArrayList<>();
-          // NOTE: idLabel is ignored.
-          this.pubsubClient =
-              FACTORY.newClient(timestampLabel, null,
-                                c.getPipelineOptions().as(PubsubOptions.class));
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        byte[] payload = null;
+        Map<String, String> attributes = null;
+        if (formatFn != null) {
+          PubsubMessage message = formatFn.apply(c.element());
+          payload = message.getMessage();
+          attributes = message.getAttributeMap();
+        } else {
+          payload = CoderUtils.encodeToByteArray(getCoder(), c.element());
         }
+        // NOTE: The record id is always null.
+        OutgoingMessage message =
+            new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null);
+        output.add(message);
 
-        @ProcessElement
-        public void processElement(ProcessContext c) throws IOException {
-          // NOTE: The record id is always null.
-          OutgoingMessage message =
-              new OutgoingMessage(CoderUtils.encodeToByteArray(getCoder(), c.element()),
-                                  c.timestamp().getMillis(), null);
-          output.add(message);
-
-          if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
-            publish();
-          }
+        if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
+          publish();
         }
+      }
 
-        @FinishBundle
-        public void finishBundle(Context c) throws IOException {
-          if (!output.isEmpty()) {
-            publish();
-          }
-          output = null;
-          pubsubClient.close();
-          pubsubClient = null;
+      @FinishBundle
+      public void finishBundle(Context c) throws IOException {
+        if (!output.isEmpty()) {
+          publish();
         }
+        output = null;
+        pubsubClient.close();
+        pubsubClient = null;
+      }
 
-        private void publish() throws IOException {
-          int n = pubsubClient.publish(
-              PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
-              output);
-          checkState(n == output.size());
-          output.clear();
-        }
+      private void publish() throws IOException {
+        int n = pubsubClient.publish(
+            PubsubClient.topicPathFromName(getTopic().project, getTopic().topic),
+            output);
+        checkState(n == output.size());
+        output.clear();
+      }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
-          builder.delegate(Bound.this);
-        }
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+        builder.delegate(Write.this);
       }
     }
-
-    /** Disallow construction of utility class. */
-    private Write() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index 75f6b7d..c726fd7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -21,24 +21,30 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.hash.Hashing;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -46,6 +52,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -105,23 +112,27 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
     private static final NullableCoder<String> RECORD_ID_CODER =
         NullableCoder.of(StringUtf8Coder.of());
+    private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
+            NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
 
     @Override
     public void encode(
         OutgoingMessage value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
+      ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
       BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
-      RECORD_ID_CODER.encode(value.recordId, outStream, context);
+      RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
     }
 
     @Override
     public OutgoingMessage decode(
         InputStream inStream, Context context) throws CoderException, IOException {
       byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
+      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested());
       long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
-      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context);
-      return new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
+      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested());
+      return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
     }
   }
 
@@ -158,17 +169,29 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     private final Coder<T> elementCoder;
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
+    private final SimpleFunction<T, PubsubMessage> formatFn;
 
-    ShardFn(Coder<T> elementCoder, int numShards, RecordIdMethod recordIdMethod) {
+    ShardFn(Coder<T> elementCoder, int numShards,
+            SimpleFunction<T, PubsubIO.PubsubMessage> formatFn, RecordIdMethod recordIdMethod) {
       this.elementCoder = elementCoder;
       this.numShards = numShards;
+      this.formatFn = formatFn;
       this.recordIdMethod = recordIdMethod;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.addValue(1L);
-      byte[] elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+      byte[] elementBytes = null;
+      Map<String, String> attributes = ImmutableMap.<String, String>of();
+      if (formatFn != null) {
+        PubsubIO.PubsubMessage message = formatFn.apply(c.element());
+        elementBytes = message.getMessage();
+        attributes = message.getAttributeMap();
+      } else {
+        elementBytes = CoderUtils.encodeToByteArray(elementCoder, c.element());
+      }
+
       long timestampMsSinceEpoch = c.timestamp().getMillis();
       @Nullable String recordId = null;
       switch (recordIdMethod) {
@@ -186,7 +209,8 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
           break;
       }
       c.output(KV.of(ThreadLocalRandom.current().nextInt(numShards),
-                     new OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId)));
+                     new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+                             recordId)));
     }
 
     @Override
@@ -365,6 +389,12 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
    */
   private final RecordIdMethod recordIdMethod;
 
+  /**
+   * In order to publish attributes, a formatting function is used to format the output into
+   * a {@link PubsubIO.PubsubMessage}.
+   */
+  private final SimpleFunction<T, PubsubIO.PubsubMessage> formatFn;
+
   @VisibleForTesting
   PubsubUnboundedSink(
       PubsubClientFactory pubsubFactory,
@@ -376,6 +406,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       int publishBatchSize,
       int publishBatchBytes,
       Duration maxLatency,
+      SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
       RecordIdMethod recordIdMethod) {
     this.pubsubFactory = pubsubFactory;
     this.topic = topic;
@@ -386,6 +417,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
     this.publishBatchSize = publishBatchSize;
     this.publishBatchBytes = publishBatchBytes;
     this.maxLatency = maxLatency;
+    this.formatFn = formatFn;
     this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
   }
 
@@ -395,30 +427,54 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
       Coder<T> elementCoder,
       String timestampLabel,
       String idLabel,
+      SimpleFunction<T, PubsubIO.PubsubMessage> formatFn,
       int numShards) {
     this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards,
          DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY,
-         RecordIdMethod.RANDOM);
+         formatFn, RecordIdMethod.RANDOM);
   }
 
+  /**
+   * Get the topic being written to.
+   */
   public TopicPath getTopic() {
     return topic.get();
   }
 
+  /**
+   * Get the {@link ValueProvider} for the topic being written to.
+   */
   public ValueProvider<TopicPath> getTopicProvider() {
     return topic;
   }
 
+  /**
+   * Get the timestamp label.
+   */
   @Nullable
   public String getTimestampLabel() {
     return timestampLabel;
   }
 
+  /**
+   * Get the id label.
+   */
   @Nullable
   public String getIdLabel() {
     return idLabel;
   }
 
+  /**
+   * Get the format function used for PubSub attributes.
+   */
+  @Nullable
+  public SimpleFunction<T, PubsubIO.PubsubMessage> getFormatFn() {
+    return formatFn;
+  }
+
+  /**
+   * Get the Coder used to encode output elements.
+   */
   public Coder<T> getElementCoder() {
     return elementCoder;
   }
@@ -433,7 +489,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
                     .plusDelayOf(maxLatency))))
             .discardingFiredPanes())
          .apply("PubsubUnboundedSink.Shard",
-             ParDo.of(new ShardFn<T>(elementCoder, numShards, recordIdMethod)))
+             ParDo.of(new ShardFn<T>(elementCoder, numShards, formatFn, recordIdMethod)))
          .setCoder(KvCoder.of(VarIntCoder.of(), CODER))
          .apply(GroupByKey.<Integer, OutgoingMessage>create())
          .apply("PubsubUnboundedSink.Writer",

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 4b3792d..c1f8720 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -58,6 +59,7 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -394,6 +396,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     @VisibleForTesting
     final SubscriptionPath subscription;
 
+    private final SimpleFunction<PubsubIO.PubsubMessage, T> parseFn;
+
     /**
      * Client on which to talk to Pubsub. Null if closed.
      */
@@ -580,10 +584,12 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     /**
      * Construct a reader.
      */
-    public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription)
+    public PubsubReader(PubsubOptions options, PubsubSource<T> outer, SubscriptionPath subscription,
+                        SimpleFunction<PubsubIO.PubsubMessage, T> parseFn)
         throws IOException, GeneralSecurityException {
       this.outer = outer;
       this.subscription = subscription;
+      this.parseFn = parseFn;
       pubsubClient =
           outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel,
                                               options);
@@ -959,7 +965,12 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         throw new NoSuchElementException();
       }
       try {
-        return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
+        if (parseFn != null) {
+          return parseFn.apply(new PubsubIO.PubsubMessage(
+                  current.elementBytes, current.attributes));
+        } else {
+          return CoderUtils.decodeFromByteArray(outer.outer.elementCoder, current.elementBytes);
+        }
       } catch (CoderException e) {
         throw new RuntimeException("Unable to decode element from Pubsub message: ", e);
       }
@@ -1110,7 +1121,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
         }
       }
       try {
-        reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription);
+        reader = new PubsubReader<>(options.as(PubsubOptions.class), this, subscription,
+                outer.parseFn);
       } catch (GeneralSecurityException | IOException e) {
         throw new RuntimeException("Unable to subscribe to " + subscriptionPath + ": ", e);
       }
@@ -1260,6 +1272,13 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   @Nullable
   private final String idLabel;
 
+  /**
+   * If not {@literal null}, the user is asking for PubSub attributes. This parse function will be
+   * used to parse {@link PubsubIO.PubsubMessage}s containing a payload and attributes.
+   */
+  @Nullable
+  SimpleFunction<PubsubMessage, T> parseFn;
+
   @VisibleForTesting
   PubsubUnboundedSource(
       Clock clock,
@@ -1269,7 +1288,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
       @Nullable String timestampLabel,
-      @Nullable String idLabel) {
+      @Nullable String idLabel,
+      @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
     checkArgument((topic == null) != (subscription == null),
                   "Exactly one of topic and subscription must be given");
     checkArgument((topic == null) == (project == null),
@@ -1282,6 +1302,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     this.elementCoder = checkNotNull(elementCoder);
     this.timestampLabel = timestampLabel;
     this.idLabel = idLabel;
+    this.parseFn = parseFn;
   }
 
   /**
@@ -1294,49 +1315,83 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       @Nullable ValueProvider<SubscriptionPath> subscription,
       Coder<T> elementCoder,
       @Nullable String timestampLabel,
-      @Nullable String idLabel) {
-    this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel);
+      @Nullable String idLabel,
+      @Nullable SimpleFunction<PubsubIO.PubsubMessage, T> parseFn) {
+    this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel,
+        parseFn);
   }
 
+  /**
+   * Get the coder used for elements.
+   */
   public Coder<T> getElementCoder() {
     return elementCoder;
   }
 
+  /**
+   * Get the project path.
+   */
   @Nullable
   public ProjectPath getProject() {
     return project == null ? null : project.get();
   }
 
+  /**
+   * Get the topic being read from.
+   */
   @Nullable
   public TopicPath getTopic() {
     return topic == null ? null : topic.get();
   }
 
+  /**
+   * Get the {@link ValueProvider} for the topic being read from.
+   */
   @Nullable
   public ValueProvider<TopicPath> getTopicProvider() {
     return topic;
   }
 
+  /**
+   * Get the subscription being read from.
+   */
   @Nullable
   public SubscriptionPath getSubscription() {
     return subscription == null ? null : subscription.get();
   }
 
+  /**
+   * Get the {@link ValueProvider} for the subscription being read from.
+   */
   @Nullable
   public ValueProvider<SubscriptionPath> getSubscriptionProvider() {
     return subscription;
   }
 
+  /**
+   * Get the timestamp label.
+   */
   @Nullable
   public String getTimestampLabel() {
     return timestampLabel;
   }
 
+  /**
+   * Get the id label.
+   */
   @Nullable
   public String getIdLabel() {
     return idLabel;
   }
 
+  /**
+   * Get the parsing function for PubSub attributes.
+   */
+  @Nullable
+  public SimpleFunction<PubsubIO.PubsubMessage, T> getWithAttributesParseFn() {
+    return parseFn;
+  }
+
   @Override
   public PCollection<T> expand(PBegin input) {
     return input.getPipeline().begin()

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index a339af7..1541059 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -102,7 +102,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
  * as the input.
  *
  * <p>If the input {@code PCollection} contains late data (see
- * {@link org.apache.beam.sdk.io.PubsubIO.Read.Bound#timestampLabel}
+ * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel}
  * for an example of how this can occur) or the
  * {@link Window#triggering requested TriggerFn} can fire before
  * the watermark, then there may be multiple elements

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
index 49a2b87..ee25448 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java
@@ -83,6 +83,7 @@ public class PropertyNames {
   public static final String PARALLEL_INPUT = "parallel_input";
   public static final String PHASE = "phase";
   public static final String PUBSUB_ID_LABEL = "pubsub_id_label";
+  public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn";
   public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription";
   public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override";
   public static final String PUBSUB_TIMESTAMP_LABEL = "pubsub_timestamp_label";

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index 06b776b..fc84057 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -310,6 +310,8 @@ public abstract class PubsubClient implements Closeable {
      */
     public final byte[] elementBytes;
 
+    public final Map<String, String> attributes;
+
     /**
      * Timestamp for element (ms since epoch).
      */
@@ -322,9 +324,10 @@ public abstract class PubsubClient implements Closeable {
     @Nullable
     public final String recordId;
 
-    public OutgoingMessage(
-        byte[] elementBytes, long timestampMsSinceEpoch, @Nullable String recordId) {
+    public OutgoingMessage(byte[] elementBytes, Map<String, String> attributes,
+                           long timestampMsSinceEpoch, @Nullable String recordId) {
       this.elementBytes = elementBytes;
+      this.attributes = attributes;
       this.timestampMsSinceEpoch = timestampMsSinceEpoch;
       this.recordId = recordId;
     }
@@ -347,13 +350,15 @@ public abstract class PubsubClient implements Closeable {
       OutgoingMessage that = (OutgoingMessage) o;
 
       return timestampMsSinceEpoch == that.timestampMsSinceEpoch
-             && Arrays.equals(elementBytes, that.elementBytes)
-             && Objects.equal(recordId, that.recordId);
+              && Arrays.equals(elementBytes, that.elementBytes)
+              && Objects.equal(attributes, that.attributes)
+              && Objects.equal(recordId, that.recordId);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch, recordId);
+      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
+              recordId);
     }
   }
 
@@ -369,6 +374,8 @@ public abstract class PubsubClient implements Closeable {
      */
     public final byte[] elementBytes;
 
+    public Map<String, String> attributes;
+
     /**
      * Timestamp for element (ms since epoch). Either Pubsub's processing time,
      * or the custom timestamp associated with the message.
@@ -392,11 +399,13 @@ public abstract class PubsubClient implements Closeable {
 
     public IncomingMessage(
         byte[] elementBytes,
+        Map<String, String> attributes,
         long timestampMsSinceEpoch,
         long requestTimeMsSinceEpoch,
         String ackId,
         String recordId) {
       this.elementBytes = elementBytes;
+      this.attributes = attributes;
       this.timestampMsSinceEpoch = timestampMsSinceEpoch;
       this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
       this.ackId = ackId;
@@ -404,8 +413,8 @@ public abstract class PubsubClient implements Closeable {
     }
 
     public IncomingMessage withRequestTime(long requestTimeMsSinceEpoch) {
-      return new IncomingMessage(elementBytes, timestampMsSinceEpoch, requestTimeMsSinceEpoch,
-                                 ackId, recordId);
+      return new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
+              requestTimeMsSinceEpoch, ackId, recordId);
     }
 
     @Override
@@ -429,12 +438,13 @@ public abstract class PubsubClient implements Closeable {
              && requestTimeMsSinceEpoch == that.requestTimeMsSinceEpoch
              && ackId.equals(that.ackId)
              && recordId.equals(that.recordId)
-             && Arrays.equals(elementBytes, that.elementBytes);
+             && Arrays.equals(elementBytes, that.elementBytes)
+              && Objects.equal(attributes, that.attributes);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(Arrays.hashCode(elementBytes), timestampMsSinceEpoch,
+      return Objects.hashCode(Arrays.hashCode(elementBytes), attributes, timestampMsSinceEpoch,
                               requestTimeMsSinceEpoch,
                               ackId, recordId);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
index 201877c..4a6ddac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java
@@ -223,6 +223,10 @@ public class PubsubGrpcClient extends PubsubClient {
           PubsubMessage.newBuilder()
                        .setData(ByteString.copyFrom(outgoingMessage.elementBytes));
 
+      if (outgoingMessage.attributes != null) {
+        message.putAllAttributes(outgoingMessage.attributes);
+      }
+
       if (timestampLabel != null) {
         message.getMutableAttributes()
                .put(timestampLabel, String.valueOf(outgoingMessage.timestampMsSinceEpoch));
@@ -286,7 +290,7 @@ public class PubsubGrpcClient extends PubsubClient {
         recordId = pubsubMessage.getMessageId();
       }
 
-      incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
                                                requestTimeMsSinceEpoch, ackId, recordId));
     }
     return incomingMessages;

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
index 215a136..6bc104f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java
@@ -138,6 +138,8 @@ public class PubsubJsonClient extends PubsubClient {
       Map<String, String> attributes = pubsubMessage.getAttributes();
       if ((timestampLabel != null || idLabel != null) && attributes == null) {
         attributes = new TreeMap<>();
+      }
+      if (attributes != null) {
         pubsubMessage.setAttributes(attributes);
       }
 
@@ -201,7 +203,7 @@ public class PubsubJsonClient extends PubsubClient {
         recordId = pubsubMessage.getMessageId();
       }
 
-      incomingMessages.add(new IncomingMessage(elementBytes, timestampMsSinceEpoch,
+      incomingMessages.add(new IncomingMessage(elementBytes, attributes, timestampMsSinceEpoch,
                                                requestTimeMsSinceEpoch, ackId, recordId));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 45477fc..61479f9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -39,7 +40,7 @@ import org.apache.beam.sdk.options.PubsubOptions;
  * testing {@link #publish}, {@link #pull}, {@link #acknowledge} and {@link #modifyAckDeadline}
  * methods. Relies on statics to mimic the Pubsub service, though we try to hide that.
  */
-public class PubsubTestClient extends PubsubClient {
+public class PubsubTestClient extends PubsubClient implements Serializable {
   /**
    * Mimic the state of the simulated Pubsub 'service'.
    *
@@ -113,7 +114,8 @@ public class PubsubTestClient extends PubsubClient {
   private static final State STATE = new State();
 
   /** Closing the factory will validate all expected messages were processed. */
-  public interface PubsubTestClientFactory extends PubsubClientFactory, Closeable {
+  public interface PubsubTestClientFactory
+          extends PubsubClientFactory, Closeable, Serializable {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f032facb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index f1bf788..e15562e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import java.util.Set;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -48,19 +50,20 @@ public class PubsubIOTest {
   @Test
   public void testPubsubIOGetName() {
     assertEquals("PubsubIO.Read",
-        PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName());
+        PubsubIO.<String>read().topic("projects/myproject/topics/mytopic").getName());
     assertEquals("PubsubIO.Write",
-        PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName());
+        PubsubIO.<String>write().topic("projects/myproject/topics/mytopic").getName());
   }
 
   @Test
   public void testTopicValidationSuccess() throws Exception {
-    PubsubIO.Read.topic("projects/my-project/topics/abc");
-    PubsubIO.Read.topic("projects/my-project/topics/ABC");
-    PubsubIO.Read.topic("projects/my-project/topics/AbC-DeF");
-    PubsubIO.Read.topic("projects/my-project/topics/AbC-1234");
-    PubsubIO.Read.topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
-    PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
+    PubsubIO.<String>read().topic("projects/my-project/topics/abc");
+    PubsubIO.<String>read().topic("projects/my-project/topics/ABC");
+    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-DeF");
+    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234");
+    PubsubIO.<String>read().topic("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc");
+    PubsubIO.<String>read().topic(new StringBuilder()
+        .append("projects/my-project/topics/A-really-long-one-")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("11111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -70,13 +73,14 @@ public class PubsubIOTest {
   @Test
   public void testTopicValidationBadCharacter() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    PubsubIO.Read.topic("projects/my-project/topics/abc-*-abc");
+    PubsubIO.<String>read().topic("projects/my-project/topics/abc-*-abc");
   }
 
   @Test
   public void testTopicValidationTooLong() throws Exception {
     thrown.expect(IllegalArgumentException.class);
-    PubsubIO.Read.topic(new StringBuilder().append("projects/my-project/topics/A-really-long-one-")
+    PubsubIO.<String>read().topic(new StringBuilder().append
+        ("projects/my-project/topics/A-really-long-one-")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("111111111111111111111111111111111111111111111111111111111111111111111111111111111")
         .append("1111111111111111111111111111111111111111111111111111111111111111111111111111")
@@ -88,7 +92,7 @@ public class PubsubIOTest {
     String topic = "projects/project/topics/topic";
     String subscription = "projects/project/subscriptions/subscription";
     Duration maxReadTime = Duration.standardMinutes(5);
-    PubsubIO.Read.Bound<String> read = PubsubIO.Read
+    PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic))
         .subscription(StaticValueProvider.of(subscription))
         .timestampLabel("myTimestamp")
@@ -109,7 +113,7 @@ public class PubsubIOTest {
   @Test
   public void testNullTopic() {
     String subscription = "projects/project/subscriptions/subscription";
-    PubsubIO.Read.Bound<String> read = PubsubIO.Read
+    PubsubIO.Read<String> read = PubsubIO.<String>read()
         .subscription(StaticValueProvider.of(subscription));
     assertNull(read.getTopic());
     assertNotNull(read.getSubscription());
@@ -119,7 +123,7 @@ public class PubsubIOTest {
   @Test
   public void testNullSubscription() {
     String topic = "projects/project/topics/topic";
-    PubsubIO.Read.Bound<String> read = PubsubIO.Read
+    PubsubIO.Read<String> read = PubsubIO.<String>read()
         .topic(StaticValueProvider.of(topic));
     assertNotNull(read.getTopic());
     assertNull(read.getSubscription());
@@ -130,9 +134,10 @@ public class PubsubIOTest {
   @Category(RunnableOnService.class)
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PubsubIO.Read.Bound<String> read =
-        PubsubIO.Read.subscription("projects/project/subscriptions/subscription")
-            .maxNumRecords(1);
+    PubsubIO.Read<String> read =
+        PubsubIO.<String>read().subscription("projects/project/subscriptions/subscription")
+            .maxNumRecords(1)
+            .withCoder(StringUtf8Coder.of());
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("PubsubIO.Read should include the subscription in its primitive display data",
@@ -142,7 +147,7 @@ public class PubsubIOTest {
   @Test
   public void testWriteDisplayData() {
     String topic = "projects/project/topics/topic";
-    PubsubIO.Write.Bound<?> write = PubsubIO.Write
+    PubsubIO.Write<?> write = PubsubIO.<String>write()
         .topic(topic)
         .timestampLabel("myTimestamp")
         .idLabel("myId");
@@ -158,7 +163,7 @@ public class PubsubIOTest {
   @Category(RunnableOnService.class)
   public void testPrimitiveWriteDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic");
+    PubsubIO.Write<?> write = PubsubIO.<String>write().topic("projects/project/topics/topic");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("PubsubIO.Write should include the topic in its primitive display data",


Mime
View raw message