beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-3744) Support full PubsubMessages
Date Thu, 26 Jul 2018 22:14:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-3744?focusedWorklogId=127934&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127934 ]

ASF GitHub Bot logged work on BEAM-3744:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jul/18 22:13
            Start Date: 26/Jul/18 22:13
    Worklog Time Spent: 10m 
      Work Description: charlesccychen closed pull request #5952: [BEAM-3744] Python PubSub API Fixes and Tests
URL: https://github.com/apache/beam/pull/5952
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats.py b/sdks/python/apache_beam/examples/complete/game/game_stats.py
index a78b91bf043..57664a15589 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats.py
@@ -298,10 +298,10 @@ def run(argv=None):
     # Read game events from Pub/Sub using custom timestamps, which
     # are extracted from the data elements, and parse the data.
     if args.subscription:
-      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+      scores = p | 'ReadPubSub' >> beam.io.ReadFromPubSub(
           subscription=args.subscription)
     else:
-      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+      scores = p | 'ReadPubSub' >> beam.io.ReadFromPubSub(
           topic=args.topic)
     raw_events = (
         scores
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board.py b/sdks/python/apache_beam/examples/complete/game/leader_board.py
index ca2d46c294a..b114b1321b7 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board.py
@@ -306,10 +306,10 @@ def run(argv=None):
 
     # Read from PubSub into a PCollection.
     if args.subscription:
-      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+      scores = p | 'ReadPubSub' >> beam.io.ReadFromPubSub(
           subscription=args.subscription)
     else:
-      scores = p | 'ReadPubSub' >> beam.io.ReadStringsFromPubSub(
+      scores = p | 'ReadPubSub' >> beam.io.ReadFromPubSub(
           topic=args.topic)
 
     events = (
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount.py b/sdks/python/apache_beam/examples/streaming_wordcount.py
index 8bc02fa9129..db3b97cda18 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount.py
@@ -23,7 +23,7 @@
 import argparse
 import logging
 
-from past.builtins import unicode
+import six
 
 import apache_beam as beam
 import apache_beam.transforms.window as window
@@ -60,10 +60,10 @@ def run(argv=None):
 
   # Read from PubSub into a PCollection.
   if known_args.input_subscription:
-    lines = p | beam.io.ReadStringsFromPubSub(
+    lines = p | beam.io.ReadFromPubSub(
         subscription=known_args.input_subscription)
   else:
-    lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
+    lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
@@ -72,7 +72,7 @@ def count_ones(word_ones):
 
   counts = (lines
             | 'split' >> (beam.ParDo(WordExtractingDoFn())
-                          .with_output_types(unicode))
+                          .with_output_types(six.text_type))
             | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
             | beam.WindowInto(window.FixedWindows(15, 0))
             | 'group' >> beam.GroupByKey()
@@ -87,7 +87,7 @@ def format_result(word_count):
 
   # Write to PubSub.
   # pylint: disable=expression-not-assigned
-  output | beam.io.WriteStringsToPubSub(known_args.output_topic)
+  output | beam.io.WriteToPubSub(known_args.output_topic)
 
   result = p.run()
   result.wait_until_finish()
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_debugging.py b/sdks/python/apache_beam/examples/streaming_wordcount_debugging.py
index 3f0d91213e3..edaedb5bfc8 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_debugging.py
@@ -105,10 +105,10 @@ def run(argv=None):
 
   # Read from PubSub into a PCollection.
   if known_args.input_subscription:
-    lines = p | beam.io.ReadStringsFromPubSub(
+    lines = p | beam.io.ReadFromPubSub(
         subscription=known_args.input_subscription)
   else:
-    lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
+    lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
 
   # Count the occurrences of each word.
   def count_ones(word_ones):
diff --git a/sdks/python/apache_beam/examples/windowed_wordcount.py b/sdks/python/apache_beam/examples/windowed_wordcount.py
index 1ea7abda64f..5eb05c02ff0 100644
--- a/sdks/python/apache_beam/examples/windowed_wordcount.py
+++ b/sdks/python/apache_beam/examples/windowed_wordcount.py
@@ -68,7 +68,7 @@ def run(argv=None):
   with beam.Pipeline(argv=pipeline_args) as p:
 
     # Read the text from PubSub messages.
-    lines = p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
+    lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
 
     # Get the number of appearances of a word.
     def count_ones(word_ones):
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index a57bef8f395..a8e8d4de81a 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -20,14 +20,6 @@
 pipelines, during remote execution.
 
 This API is currently under development and is subject to change.
-
-Description of common arguments used in this module:
-  topic: Cloud Pub/Sub topic in the form "projects/<project>/topics/<topic>".
-    If provided, subscription must be None.
-  subscription: Existing Cloud Pub/Sub subscription to use in the
-    form "projects/<project>/subscriptions/<subscription>". If not specified,
-    a temporary subscription will be created from the specified topic. If
-    provided, topic must be None.
 """
 
 from __future__ import absolute_import
@@ -35,6 +27,7 @@
 import re
 from builtins import object
 
+import six
 from past.builtins import basestring
 
 from apache_beam import coders
@@ -44,6 +37,7 @@
 from apache_beam.transforms import Map
 from apache_beam.transforms import PTransform
 from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.utils.annotations import deprecated
 
 try:
   from google.cloud.proto.pubsub.v1 import pubsub_pb2
@@ -51,40 +45,52 @@
   pubsub_pb2 = None
 
 __all__ = ['PubsubMessage', 'ReadFromPubSub', 'ReadStringsFromPubSub',
-           'WriteStringsToPubSub']
+           'WriteStringsToPubSub', 'WriteToPubSub']
 
 
 class PubsubMessage(object):
-  """Represents a message from Cloud Pub/Sub.
+  """Represents a Cloud Pub/Sub message.
+
+  Message payload includes the data and attributes fields. For the payload to be
+  valid, at least one of its fields must be non-empty.
 
   This interface is experimental. No backwards compatibility guarantees.
 
   Attributes:
-    payload: (str) Message payload, as a byte string.
-    attributes: (dict) Map of string to string.
+    data: (six.binary_type) Message data. May be None.
+    attributes: (dict) Key-value map of str to str, containing both user-defined
+      and service generated attributes (such as id_label and
+      timestamp_attribute). May be None.
   """
 
-  def __init__(self, payload, attributes):
-    """Constructs a message.
-
-    Beam users should not directly construct ``PubsubMessages``.
-    """
-    self.payload = payload
+  def __init__(self, data, attributes):
+    if data is None and not attributes:
+      raise ValueError('Either data (%r) or attributes (%r) must be set.',
+                       data, attributes)
+    self.data = data
     self.attributes = attributes
 
+  def __hash__(self):
+    return hash((self.data, frozenset(self.attributes.items())))
+
   def __eq__(self, other):
     return isinstance(other, PubsubMessage) and (
-        self.payload == other.payload and
+        self.data == other.data and
         self.attributes == other.attributes)
 
   def __repr__(self):
-    return 'PubsubMessage(%s, %s)' % (self.payload, self.attributes)
+    return 'PubsubMessage(%s, %s)' % (self.data, self.attributes)
 
   @staticmethod
-  def _from_proto(proto_msg):
+  def _from_proto_str(proto_msg):
     """Construct from serialized form of ``PubsubMessage``.
 
-    https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
+    Args:
+      proto_msg: String containing a serialized protobuf of type
+      https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
+
+    Returns:
+      A new PubsubMessage object.
     """
     msg = pubsub_pb2.PubsubMessage()
     msg.ParseFromString(proto_msg)
@@ -92,6 +98,23 @@ def _from_proto(proto_msg):
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
     return PubsubMessage(msg.data, attributes)
 
+  def _to_proto_str(self):
+    """Get serialized form of ``PubsubMessage``.
+
+    Args:
+      proto_msg: str containing a serialized protobuf.
+
+    Returns:
+      A str containing a serialized protobuf of type
+      https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.PubsubMessage
+      containing the payload of this object.
+    """
+    msg = pubsub_pb2.PubsubMessage()
+    msg.data = self.data
+    for key, value in self.attributes.iteritems():
+      msg.attributes[key] = value
+    return msg.SerializeToString()
+
   @staticmethod
   def _from_message(msg):
     """Construct from ``google.cloud.pubsub.message.Message``.
@@ -108,16 +131,27 @@ class ReadFromPubSub(PTransform):
   # Implementation note: This ``PTransform`` is overridden by Directrunner.
 
   def __init__(self, topic=None, subscription=None, id_label=None,
-               timestamp_attribute=None):
+               with_attributes=False, timestamp_attribute=None):
     """Initializes ``ReadFromPubSub``.
 
     Args:
+      topic: Cloud Pub/Sub topic in the form
+        "projects/<project>/topics/<topic>". If provided, subscription must be
+        None.
+      subscription: Existing Cloud Pub/Sub subscription to use in the
+        form "projects/<project>/subscriptions/<subscription>". If not
+        specified, a temporary subscription will be created from the specified
+        topic. If provided, topic must be None.
       id_label: The attribute on incoming Pub/Sub messages to use as a unique
         record identifier. When specified, the value of this attribute (which
         can be any string that uniquely identifies the record) will be used for
         deduplication of messages. If not provided, we cannot guarantee
         that no duplicate data will be delivered on the Pub/Sub stream. In this
         case, deduplication of the stream will be strictly best effort.
+      with_attributes:
+        True - output elements will be :class:`~PubsubMessage` objects.
+        False - output elements will be of type ``six.binary_type`` (message
+        data only).
       timestamp_attribute: Message value to use as element timestamp. If None,
         uses message publishing time as the timestamp.
 
@@ -131,8 +165,7 @@ def __init__(self, topic=None, subscription=None, id_label=None,
           units smaller than milliseconds) may be ignored.
     """
     super(ReadFromPubSub, self).__init__()
-    # TODO(BEAM-4536): Add with_attributes to kwargs once fixed.
-    self.with_attributes = False
+    self.with_attributes = with_attributes
     self._source = _PubSubSource(
         topic=topic,
         subscription=subscription,
@@ -142,11 +175,10 @@ def __init__(self, topic=None, subscription=None, id_label=None,
 
   def expand(self, pvalue):
     pcoll = pvalue.pipeline | Read(self._source)
+    pcoll.element_type = six.binary_type
     if self.with_attributes:
-      pcoll = pcoll | Map(PubsubMessage._from_proto)
+      pcoll = pcoll | Map(PubsubMessage._from_proto_str)
       pcoll.element_type = PubsubMessage
-    else:
-      pcoll.element_type = bytes
     return pcoll
 
   def to_runner_api_parameter(self, context):
@@ -155,45 +187,97 @@ def to_runner_api_parameter(self, context):
     return self.to_runner_api_pickled(context)
 
 
-class ReadStringsFromPubSub(PTransform):
-  """A ``PTransform`` for reading utf-8 string payloads from Cloud Pub/Sub.
+@deprecated(since='2.6.0', extra_message='Use ReadFromPubSub instead.')
+def ReadStringsFromPubSub(topic=None, subscription=None, id_label=None):
+  return _ReadStringsFromPubSub(topic, subscription, id_label)
 
-  Outputs elements of type ``unicode``, decoded from UTF-8.
 
-  This class is deprecated.
-  """
+class _ReadStringsFromPubSub(PTransform):
+  """This class is deprecated. Use ``ReadFromPubSub`` instead."""
 
   def __init__(self, topic=None, subscription=None, id_label=None):
-    super(ReadStringsFromPubSub, self).__init__()
+    super(_ReadStringsFromPubSub, self).__init__()
     self.topic = topic
     self.subscription = subscription
     self.id_label = id_label
 
   def expand(self, pvalue):
     p = (pvalue.pipeline
-         | ReadFromPubSub(self.topic, self.subscription, self.id_label)
+         | ReadFromPubSub(self.topic, self.subscription, self.id_label,
+                          with_attributes=False)
          | 'DecodeString' >> Map(lambda b: b.decode('utf-8')))
     p.element_type = basestring
     return p
 
 
-class WriteStringsToPubSub(PTransform):
-  """A ``PTransform`` for writing utf-8 string payloads to Cloud Pub/Sub."""
+@deprecated(since='2.6.0', extra_message='Use WriteToPubSub instead.')
+def WriteStringsToPubSub(topic):
+  return _WriteStringsToPubSub(topic)
+
+
+class _WriteStringsToPubSub(PTransform):
+  """This class is deprecated. Use ``WriteToPubSub`` instead."""
 
   def __init__(self, topic):
-    """Initializes ``WriteStringsToPubSub``.
+    """Initializes ``_WriteStringsToPubSub``.
 
     Attributes:
       topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
     """
-    super(WriteStringsToPubSub, self).__init__()
-    self._sink = _PubSubPayloadSink(topic)
+    super(_WriteStringsToPubSub, self).__init__()
+    self._sink = _PubSubSink(topic, id_label=None, with_attributes=False,
+                             timestamp_attribute=None)
 
   def expand(self, pcoll):
     pcoll = pcoll | 'EncodeString' >> Map(lambda s: s.encode('utf-8'))
     pcoll.element_type = bytes
     return pcoll | Write(self._sink)
 
+
+class WriteToPubSub(PTransform):
+  """A ``PTransform`` for writing messages to Cloud Pub/Sub."""
+  # Implementation note: This ``PTransform`` is overridden by Directrunner.
+
+  def __init__(self, topic, with_attributes, id_label=None,
+               timestamp_attribute=None):
+    """Initializes ``WriteToPubSub``.
+
+    Args:
+      topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
+      with_attributes:
+        True - input elements will be :class:`~PubsubMessage` objects.
+        False - input elements will be of type ``six.binary_type`` (message
+        data only).
+      id_label: If set, will set an attribute for each Cloud Pub/Sub message
+        with the given name and a unique value. This attribute can then be used
+        in a ReadFromPubSub PTransform to deduplicate messages.
+      timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub
+        message with the given name and the message's publish time as the value.
+    """
+    super(WriteToPubSub, self).__init__()
+    self.with_attributes = with_attributes
+    self.id_label = id_label
+    self.timestamp_attribute = timestamp_attribute
+    self._sink = _PubSubSink(topic, id_label, with_attributes,
+                             timestamp_attribute)
+
+  @staticmethod
+  def to_proto_str(element):
+    if not isinstance(element, PubsubMessage):
+      raise TypeError('Unexpected element. Type: %s (expected: PubsubMessage), '
+                      'value: %r' % (type(element), element))
+    return element._to_proto_str()
+
+  def expand(self, pcoll):
+    if self.with_attributes:
+      pcoll = pcoll | 'ToProtobuf' >> Map(self.to_proto_str)
+
+    # Without attributes, message data is written as-is. With attributes,
+    # message data + attributes are passed as a serialized protobuf string (see
+    # ``PubsubMessage._to_proto_str`` for exact protobuf message type).
+    pcoll.element_type = six.binary_type
+    return pcoll | Write(self._sink)
+
   def to_runner_api_parameter(self, context):
     # Required as this is identified by type in PTransformOverrides.
     # TODO(BEAM-3812): Use an actual URN here.
@@ -230,19 +314,17 @@ def parse_subscription(full_subscription):
 
 
 class _PubSubSource(dataflow_io.NativeSource):
-  """Source for the payload of a message as bytes from a Cloud Pub/Sub topic.
+  """Source for a Cloud Pub/Sub topic or subscription.
 
   This ``NativeSource`` is overridden by a native Pubsub implementation.
 
   Attributes:
-    with_attributes: If False, will fetch just message payload. Otherwise,
+    with_attributes: If False, will fetch just message data. Otherwise,
       fetches ``PubsubMessage`` protobufs.
   """
 
   def __init__(self, topic=None, subscription=None, id_label=None,
                with_attributes=False, timestamp_attribute=None):
-    # We are using this coder explicitly for portability reasons of PubsubIO
-    # across implementations in languages.
     self.coder = coders.BytesCoder()
     self.full_topic = topic
     self.full_subscription = subscription
@@ -277,7 +359,14 @@ def display_data(self):
                             label='Pubsub Topic').drop_if_none(),
             'subscription':
             DisplayDataItem(self.full_subscription,
-                            label='Pubsub Subscription').drop_if_none()}
+                            label='Pubsub Subscription').drop_if_none(),
+            'with_attributes':
+            DisplayDataItem(self.with_attributes,
+                            label='With Attributes').drop_if_none(),
+            'timestamp_attribute':
+            DisplayDataItem(self.timestamp_attribute,
+                            label='Timestamp Attribute').drop_if_none(),
+           }
 
   def reader(self):
     raise NotImplementedError
@@ -286,14 +375,18 @@ def is_bounded(self):
     return False
 
 
-class _PubSubPayloadSink(dataflow_io.NativeSink):
-  """Sink for the payload of a message as bytes to a Cloud Pub/Sub topic."""
+class _PubSubSink(dataflow_io.NativeSink):
+  """Sink for a Cloud Pub/Sub topic.
 
-  def __init__(self, topic):
-    # we are using this coder explicitly for portability reasons of PubsubIO
-    # across implementations in languages.
+  This ``NativeSource`` is overridden by a native Pubsub implementation.
+  """
+
+  def __init__(self, topic, id_label, with_attributes, timestamp_attribute):
     self.coder = coders.BytesCoder()
     self.full_topic = topic
+    self.id_label = id_label
+    self.with_attributes = with_attributes
+    self.timestamp_attribute = timestamp_attribute
 
     self.project, self.topic_name = parse_topic(topic)
 
@@ -303,8 +396,14 @@ def format(self):
     return 'pubsub'
 
   def display_data(self):
-    return {'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic')}
+    return {
+        'topic': DisplayDataItem(self.full_topic, label='Pubsub Topic'),
+        'id_label': DisplayDataItem(self.id_label, label='ID Label Attribute'),
+        'with_attributes': DisplayDataItem(
+            self.with_attributes, label='With Attributes').drop_if_none(),
+        'timestamp_attribute': DisplayDataItem(
+            self.timestamp_attribute, label='Timestamp Attribute'),
+    }
 
   def writer(self):
-    raise NotImplementedError(
-        'PubSubPayloadSink is not supported in local execution.')
+    raise NotImplementedError
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
new file mode 100644
index 00000000000..d287ef28373
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -0,0 +1,144 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Integration test for Google Cloud Pub/Sub.
+"""
+
+import logging
+import unittest
+import uuid
+
+from hamcrest.core.core.allof import all_of
+from nose.plugins.attrib import attr
+
+from apache_beam.io.gcp import pubsub_it_pipeline
+from apache_beam.io.gcp.pubsub import PubsubMessage
+from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing import test_utils
+from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+
+INPUT_TOPIC = 'psit_topic_input'
+OUTPUT_TOPIC = 'psit_topic_output'
+INPUT_SUB = 'psit_subscription_input'
+OUTPUT_SUB = 'psit_subscription_output'
+
+TEST_PIPELINE_DURATION_MS = 30 * 1000
+# Takes into account Dataflow pipelines startup time.
+MESSAGE_MATCHER_TIMEOUT_S = 5 * 60
+
+
+class PubSubIntegrationTest(unittest.TestCase):
+
+  ID_LABEL = 'id'
+  TIMESTAMP_ATTRIBUTE = 'timestamp'
+  INPUT_MESSAGES = [
+      # Use ID_LABEL attribute to deduplicate messages with the same ID.
+      PubsubMessage('data001', {ID_LABEL: 'foo'}),
+      PubsubMessage('data001', {ID_LABEL: 'foo'}),
+      PubsubMessage('data001', {ID_LABEL: 'foo'}),
+      # For those elements that have the TIMESTAMP_ATTRIBUTE attribute, the IT
+      # pipeline writes back the timestamp of each element (as reported by
+      # Beam), as a TIMESTAMP_ATTRIBUTE + '_out' attribute.
+      PubsubMessage('data002', {
+          TIMESTAMP_ATTRIBUTE: '2018-07-11T02:02:50.149000Z',
+      }),
+  ]
+  EXPECTED_OUTPUT_MESSAGES = [
+      PubsubMessage('data001-seen', {'processed': 'IT'}),
+      PubsubMessage('data002-seen', {
+          TIMESTAMP_ATTRIBUTE + '_out': '2018-07-11T02:02:50.149000Z',
+          'processed': 'IT',
+      }),
+  ]
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.uuid = str(uuid.uuid4())
+
+    # Set up PubSub environment.
+    from google.cloud import pubsub
+    self.pubsub_client = pubsub.Client(project=self.project)
+    self.input_topic = self.pubsub_client.topic(INPUT_TOPIC + self.uuid)
+    self.output_topic = self.pubsub_client.topic(OUTPUT_TOPIC + self.uuid)
+    self.input_sub = self.input_topic.subscription(INPUT_SUB + self.uuid)
+    self.output_sub = self.output_topic.subscription(OUTPUT_SUB + self.uuid)
+
+    self.input_topic.create()
+    self.output_topic.create()
+    test_utils.wait_for_topics_created([self.input_topic, self.output_topic])
+    self.input_sub.create()
+    self.output_sub.create()
+
+  def tearDown(self):
+    test_utils.cleanup_subscriptions([self.input_sub, self.output_sub])
+    test_utils.cleanup_topics([self.input_topic, self.output_topic])
+
+  def _test_streaming(self, with_attributes):
+    """Runs IT pipeline with message verifier.
+
+    Args:
+      with_attributes: False - Reads and writes message data only.
+        True - Reads and writes message data and attributes. Also verifies
+        id_label and timestamp_attribute features.
+    """
+    # Build expected dataset.
+    # Set extra options to the pipeline for test purpose
+    state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
+    expected_messages = self.EXPECTED_OUTPUT_MESSAGES
+    if not with_attributes:
+      expected_messages = [pubsub_msg.data for pubsub_msg in expected_messages]
+    pubsub_msg_verifier = PubSubMessageMatcher(
+        self.project,
+        OUTPUT_SUB + self.uuid,
+        expected_messages,
+        timeout=MESSAGE_MATCHER_TIMEOUT_S,
+        with_attributes=with_attributes,
+        strip_attributes=[self.ID_LABEL, self.TIMESTAMP_ATTRIBUTE])
+    extra_opts = {'input_subscription': self.input_sub.full_name,
+                  'output_topic': self.output_topic.full_name,
+                  'wait_until_finish_duration': TEST_PIPELINE_DURATION_MS,
+                  'on_success_matcher': all_of(state_verifier,
+                                               pubsub_msg_verifier)}
+
+    # Generate input data and inject to PubSub.
+    test_utils.wait_for_subscriptions_created([self.input_sub])
+    for msg in self.INPUT_MESSAGES:
+      self.input_topic.publish(msg.data, **msg.attributes)
+
+    # Get pipeline options from command argument: --test-pipeline-options,
+    # and start pipeline job by calling pipeline main function.
+    pubsub_it_pipeline.run_pipeline(
+        argv=self.test_pipeline.get_full_options_as_args(**extra_opts),
+        with_attributes=with_attributes,
+        id_label=self.ID_LABEL,
+        timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
+
+  @attr('IT')
+  def test_streaming_data_only(self):
+    self._test_streaming(with_attributes=False)
+
+  @attr('IT')
+  def test_streaming_with_attributes(self):
+    self._test_streaming(with_attributes=True)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.DEBUG)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
new file mode 100644
index 00000000000..407ed8830fa
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/pubsub_it_pipeline.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Test pipeline for use by pubsub_integration_test.
+"""
+
+import argparse
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run_pipeline(argv, with_attributes, id_label, timestamp_attribute):
+  """Build and run the pipeline."""
+
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--output_topic', required=True,
+      help=('Output PubSub topic of the form '
+            '"projects/<PROJECT>/topic/<TOPIC>".'))
+  parser.add_argument(
+      '--input_subscription', required=True,
+      help=('Input PubSub subscription of the form '
+            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+  p = beam.Pipeline(options=pipeline_options)
+
+  # Read from PubSub into a PCollection.
+  messages = p | beam.io.ReadFromPubSub(
+      subscription=known_args.input_subscription,
+      id_label=id_label,
+      with_attributes=with_attributes,
+      timestamp_attribute=timestamp_attribute)
+
+  def add_attribute(msg, timestamp=beam.DoFn.TimestampParam):
+    msg.data += '-seen'
+    msg.attributes['processed'] = 'IT'
+    if timestamp_attribute in msg.attributes:
+      msg.attributes[timestamp_attribute + '_out'] = timestamp.to_rfc3339()
+    return msg
+
+  def modify_data(data):
+    return data + '-seen'
+
+  if with_attributes:
+    output = messages | 'add_attribute' >> beam.Map(add_attribute)
+  else:
+    output = messages | 'modify_data' >> beam.Map(modify_data)
+
+  # Write to PubSub.
+  _ = output | beam.io.WriteToPubSub(known_args.output_topic,
+                                     id_label=id_label,
+                                     with_attributes=with_attributes,
+                                     timestamp_attribute=timestamp_attribute)
+
+  result = p.run()
+  result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index 01cb0c072ef..c1dd05e874e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -35,7 +35,8 @@
 from apache_beam.io.gcp.pubsub import ReadFromPubSub
 from apache_beam.io.gcp.pubsub import ReadStringsFromPubSub
 from apache_beam.io.gcp.pubsub import WriteStringsToPubSub
-from apache_beam.io.gcp.pubsub import _PubSubPayloadSink
+from apache_beam.io.gcp.pubsub import WriteToPubSub
+from apache_beam.io.gcp.pubsub import _PubSubSink
 from apache_beam.io.gcp.pubsub import _PubSubSource
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.direct import transform_evaluator
@@ -47,6 +48,7 @@
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import window
+from apache_beam.transforms.core import Create
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils import timestamp
@@ -60,6 +62,53 @@
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
+class TestPubsubMessage(unittest.TestCase):
+
+  def test_payload_valid(self):
+    _ = PubsubMessage('', None)
+    _ = PubsubMessage('data', None)
+    _ = PubsubMessage(None, {'k': 'v'})
+
+  def test_payload_invalid(self):
+    with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'):
+      _ = PubsubMessage(None, None)
+    with self.assertRaisesRegexp(ValueError, r'data.*attributes.*must be set'):
+      _ = PubsubMessage(None, {})
+
+  @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+  def test_proto_conversion(self):
+    data = 'data'
+    attributes = {'k1': 'v1', 'k2': 'v2'}
+    m = PubsubMessage(data, attributes)
+    m_converted = PubsubMessage._from_proto_str(m._to_proto_str())
+    self.assertEqual(m_converted.data, data)
+    self.assertEqual(m_converted.attributes, attributes)
+
+  def test_eq(self):
+    a = PubsubMessage('abc', {1: 2, 3: 4})
+    b = PubsubMessage('abc', {1: 2, 3: 4})
+    c = PubsubMessage('abc', {1: 2})
+    self.assertTrue(a == b)
+    self.assertTrue(a != c)
+    self.assertTrue(b != c)
+
+  def test_hash(self):
+    a = PubsubMessage('abc', {1: 2, 3: 4})
+    b = PubsubMessage('abc', {1: 2, 3: 4})
+    c = PubsubMessage('abc', {1: 2})
+    self.assertTrue(hash(a) == hash(b))
+    self.assertTrue(hash(a) != hash(c))
+    self.assertTrue(hash(b) != hash(c))
+
+  def test_repr(self):
+    a = PubsubMessage('abc', {1: 2, 3: 4})
+    b = PubsubMessage('abc', {1: 2, 3: 4})
+    c = PubsubMessage('abc', {1: 2})
+    self.assertTrue(repr(a) == repr(b))
+    self.assertTrue(repr(a) != repr(c))
+    self.assertTrue(repr(b) != repr(c))
+
+
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestReadFromPubSubOverride(unittest.TestCase):
 
@@ -68,7 +117,7 @@ def test_expand_with_topic(self):
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic',
-                              None, 'a_label',
+                              None, 'a_label', with_attributes=False,
                               timestamp_attribute=None)
              | beam.Map(lambda x: x))
     self.assertEqual(str, pcoll.element_type)
@@ -92,7 +141,7 @@ def test_expand_with_subscription(self):
     pcoll = (p
              | ReadFromPubSub(
                  None, 'projects/fakeprj/subscriptions/a_subscription',
-                 'a_label', timestamp_attribute=None)
+                 'a_label', with_attributes=False, timestamp_attribute=None)
              | beam.Map(lambda x: x))
     self.assertEqual(str, pcoll.element_type)
 
@@ -112,17 +161,16 @@ def test_expand_with_subscription(self):
   def test_expand_with_no_topic_or_subscription(self):
     with self.assertRaisesRegexp(
         ValueError, "Either a topic or subscription must be provided."):
-      ReadFromPubSub(None, None, 'a_label',
+      ReadFromPubSub(None, None, 'a_label', with_attributes=False,
                      timestamp_attribute=None)
 
   def test_expand_with_both_topic_and_subscription(self):
     with self.assertRaisesRegexp(
         ValueError, "Only one of topic or subscription should be provided."):
       ReadFromPubSub('a_topic', 'a_subscription', 'a_label',
-                     timestamp_attribute=None)
+                     with_attributes=False, timestamp_attribute=None)
 
-  # TODO(BEAM-4536): Reenable test when bug is fixed.
-  def _test_expand_with_other_options(self):
+  def test_expand_with_other_options(self):
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
@@ -147,12 +195,12 @@ def _test_expand_with_other_options(self):
 
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
-class TestWriteStringsToPubSub(unittest.TestCase):
-  def test_expand(self):
+class TestWriteStringsToPubSubOverride(unittest.TestCase):
+  def test_expand_deprecated(self):
     p = TestPipeline()
     p.options.view_as(StandardOptions).streaming = True
     pcoll = (p
-             | ReadStringsFromPubSub('projects/fakeprj/topics/baz')
+             | ReadFromPubSub('projects/fakeprj/topics/baz')
              | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')
              | beam.Map(lambda x: x))
 
@@ -160,12 +208,36 @@ def test_expand(self):
     overrides = _get_transform_overrides(p.options)
     p.replace_all(overrides)
 
-    # Note that the direct output of ReadStringsFromPubSub will be replaced
+    # Note that the direct output of ReadFromPubSub will be replaced
+    # by a PTransformOverride, so we use a no-op Map.
+    write_transform = pcoll.producer.inputs[0].producer.transform
+
+    # Ensure that the properties passed through correctly
+    self.assertEqual('a_topic', write_transform.dofn.topic_name)
+
+  def test_expand(self):
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    pcoll = (p
+             | ReadFromPubSub('projects/fakeprj/topics/baz')
+             | WriteToPubSub('projects/fakeprj/topics/a_topic',
+                             with_attributes=True)
+             | beam.Map(lambda x: x))
+
+    # Apply the necessary PTransformOverrides.
+    overrides = _get_transform_overrides(p.options)
+    p.replace_all(overrides)
+
+    # Note that the direct output of ReadFromPubSub will be replaced
     # by a PTransformOverride, so we use a no-op Map.
     write_transform = pcoll.producer.inputs[0].producer.transform
 
     # Ensure that the properties passed through correctly
     self.assertEqual('a_topic', write_transform.dofn.topic_name)
+    self.assertEqual(True, write_transform.dofn.with_attributes)
+    # TODO(BEAM-4275): These properties aren't supported yet in direct runner.
+    self.assertEqual(None, write_transform.dofn.id_label)
+    self.assertEqual(None, write_transform.dofn.timestamp_attribute)
 
 
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@@ -179,7 +251,9 @@ def test_display_data_topic(self):
     expected_items = [
         DisplayDataItemMatcher(
             'topic', 'projects/fakeprj/topics/a_topic'),
-        DisplayDataItemMatcher('id_label', 'a_label')]
+        DisplayDataItemMatcher('id_label', 'a_label'),
+        DisplayDataItemMatcher('with_attributes', False),
+    ]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
@@ -192,7 +266,9 @@ def test_display_data_subscription(self):
     expected_items = [
         DisplayDataItemMatcher(
             'subscription', 'projects/fakeprj/subscriptions/a_subscription'),
-        DisplayDataItemMatcher('id_label', 'a_label')]
+        DisplayDataItemMatcher('id_label', 'a_label'),
+        DisplayDataItemMatcher('with_attributes', False),
+    ]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
@@ -200,7 +276,9 @@ def test_display_data_no_subscription(self):
     source = _PubSubSource('projects/fakeprj/topics/a_topic')
     dd = DisplayData.create_from(source)
     expected_items = [
-        DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')]
+        DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic'),
+        DisplayDataItemMatcher('with_attributes', False),
+    ]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
@@ -208,10 +286,16 @@ def test_display_data_no_subscription(self):
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestPubSubSink(unittest.TestCase):
   def test_display_data(self):
-    sink = _PubSubPayloadSink('projects/fakeprj/topics/a_topic')
+    sink = _PubSubSink('projects/fakeprj/topics/a_topic',
+                       id_label='id', with_attributes=False,
+                       timestamp_attribute='time')
     dd = DisplayData.create_from(sink)
     expected_items = [
-        DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic')]
+        DisplayDataItemMatcher('topic', 'projects/fakeprj/topics/a_topic'),
+        DisplayDataItemMatcher('id_label', 'id'),
+        DisplayDataItemMatcher('with_attributes', False),
+        DisplayDataItemMatcher('timestamp_attribute', 'time'),
+    ]
 
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
@@ -251,6 +335,11 @@ def __init__(self, name, client):
   def subscription(self, name):
     return FakePubsubSubscription(name, self.name, self.client)
 
+  def batch(self):
+    if self.client.batch is None:
+      self.client.batch = FakeBatch(self.client)
+    return self.client.batch
+
 
 class FakePubsubSubscription(object):
 
@@ -269,7 +358,7 @@ def __init__(self, sub, **unused_kwargs):
     self.sub = sub
 
   def __enter__(self):
-    messages = self.sub.client.messages
+    messages = self.sub.client.messages_read
     self.ack_id_to_msg = dict(zip(range(len(messages)), messages))
     return self.ack_id_to_msg
 
@@ -277,19 +366,56 @@ def __exit__(self, exc_type, exc_val, exc_tb):
     pass
 
 
+class FakeBatch(object):
+  """Context manager that accept Pubsub client writes via publish().
+
+  Verifies writes on exit.
+  """
+
+  def __init__(self, client):
+    self.client = client
+    self.published = []
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    if exc_type is not None:
+      return  # Exception will be raised.
+    hc.assert_that(self.published,
+                   hc.only_contains(*self.client.messages_write))
+
+  def publish(self, message, **attrs):
+    self.published.append([message, attrs])
+
+
 class FakePubsubClient(object):
 
-  def __init__(self, messages, project=None, **unused_kwargs):
-    self.messages = messages
+  def __init__(self, messages_read=None, messages_write=None, project=None,
+               **unused_kwargs):
+    """Creates a Pubsub client fake.
+
+    Args:
+      messages_read: List of PubsubMessage objects to return.
+      messages_write: List of [data, attributes] pairs, corresponding to
+        messages expected to be written to the client.
+      project: Name of GCP project.
+    """
+    self.messages_read = messages_read
+    self.messages_write = messages_write
     self.project = project
+    self.batch = None
 
   def topic(self, name):
     return FakePubsubTopic(name, self)
 
 
-def create_client_message(payload, message_id, attributes, publish_time):
-  """Returns a message as it would be returned from Cloud Pub/Sub client."""
-  msg = pubsub.message.Message(payload, message_id, attributes)
+def create_client_message(data, message_id, attributes, publish_time):
+  """Returns a message as it would be returned from Cloud Pub/Sub client.
+
+  This is what the reader sees.
+  """
+  msg = pubsub.message.Message(data, message_id, attributes)
   msg._service_timestamp = publish_time
   return msg
 
@@ -297,20 +423,20 @@ def create_client_message(payload, message_id, attributes, publish_time):
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestReadFromPubSub(unittest.TestCase):
 
-  # TODO(BEAM-4536): Reenable test when bug is fixed.
   @mock.patch('google.cloud.pubsub')
-  def _test_read_messages_success(self, mock_pubsub):
-    payload = 'payload'
+  def test_read_messages_success(self, mock_pubsub):
+    data = 'data'
     message_id = 'message_id'
     publish_time = '2018-03-12T13:37:01.234567Z'
     attributes = {'key': 'value'}
-    data = [create_client_message(
-        payload, message_id, attributes, publish_time)]
-    expected_data = [TestWindowedValue(PubsubMessage(payload, attributes),
-                                       timestamp.Timestamp(1520861821.234567),
-                                       [window.GlobalWindow()])]
-
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    payloads = [create_client_message(
+        data, message_id, attributes, publish_time)]
+    expected_elements = [
+        TestWindowedValue(PubsubMessage(data, attributes),
+                          timestamp.Timestamp(1520861821.234567),
+                          [window.GlobalWindow()])]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -318,19 +444,18 @@ def _test_read_messages_success(self, mock_pubsub):
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic',
                               None, 'a_label', with_attributes=True))
-    assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+    assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
 
   @mock.patch('google.cloud.pubsub')
   def test_read_strings_success(self, mock_pubsub):
-    payload = u'🤷 ¯\\_(ツ)_/¯'
-    payload_encoded = payload.encode('utf-8')
+    data = u'🤷 ¯\\_(ツ)_/¯'
+    data_encoded = data.encode('utf-8')
     publish_time = '2018-03-12T13:37:01.234567Z'
-    data = [create_client_message(
-        payload_encoded, None, None, publish_time)]
-    expected_data = [payload]
+    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    expected_elements = [data]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -338,18 +463,17 @@ def test_read_strings_success(self, mock_pubsub):
     pcoll = (p
              | ReadStringsFromPubSub('projects/fakeprj/topics/a_topic',
                                      None, 'a_label'))
-    assert_that(pcoll, equal_to(expected_data))
+    assert_that(pcoll, equal_to(expected_elements))
     p.run()
 
   @mock.patch('google.cloud.pubsub')
-  def test_read_payload_success(self, mock_pubsub):
-    payload_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
+  def test_read_data_success(self, mock_pubsub):
+    data_encoded = u'🤷 ¯\\_(ツ)_/¯'.encode('utf-8')
     publish_time = '2018-03-12T13:37:01.234567Z'
-    data = [create_client_message(
-        payload_encoded, None, None, publish_time)]
-    expected_data = [payload_encoded]
+    payloads = [create_client_message(data_encoded, None, None, publish_time)]
+    expected_elements = [data_encoded]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -357,26 +481,25 @@ def test_read_payload_success(self, mock_pubsub):
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic',
                               None, 'a_label'))
-    assert_that(pcoll, equal_to(expected_data))
+    assert_that(pcoll, equal_to(expected_elements))
     p.run()
 
-  # TODO(BEAM-4536): Reenable test when bug is fixed.
   @mock.patch('google.cloud.pubsub')
-  def _test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
-    payload = 'payload'
+  def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
+    data = 'data'
     message_id = 'message_id'
     attributes = {'time': '1337'}
     publish_time = '2018-03-12T13:37:01.234567Z'
-    data = [create_client_message(
-        payload, message_id, attributes, publish_time)]
-    expected_data = [
+    payloads = [
+        create_client_message(data, message_id, attributes, publish_time)]
+    expected_elements = [
         TestWindowedValue(
-            PubsubMessage(payload, attributes),
+            PubsubMessage(data, attributes),
             timestamp.Timestamp(micros=int(attributes['time']) * 1000),
             [window.GlobalWindow()]),
     ]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -385,27 +508,25 @@ def _test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
              | ReadFromPubSub(
                  'projects/fakeprj/topics/a_topic', None, 'a_label',
                  with_attributes=True, timestamp_attribute='time'))
-    assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+    assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
 
-  # TODO(BEAM-4536): Reenable test when bug is fixed.
   @mock.patch('google.cloud.pubsub')
-  def _test_read_messages_timestamp_attribute_rfc3339_success(self,
-                                                              mock_pubsub):
-    payload = 'payload'
+  def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
+    data = 'data'
     message_id = 'message_id'
     attributes = {'time': '2018-03-12T13:37:01.234567Z'}
     publish_time = '2018-03-12T13:37:01.234567Z'
-    data = [create_client_message(
-        payload, message_id, attributes, publish_time)]
-    expected_data = [
+    payloads = [
+        create_client_message(data, message_id, attributes, publish_time)]
+    expected_elements = [
         TestWindowedValue(
-            PubsubMessage(payload, attributes),
+            PubsubMessage(data, attributes),
             timestamp.Timestamp.from_rfc3339(attributes['time']),
             [window.GlobalWindow()]),
     ]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -414,20 +535,19 @@ def _test_read_messages_timestamp_attribute_rfc3339_success(self,
              | ReadFromPubSub(
                  'projects/fakeprj/topics/a_topic', None, 'a_label',
                  with_attributes=True, timestamp_attribute='time'))
-    assert_that(pcoll, equal_to(expected_data), reify_windows=True)
+    assert_that(pcoll, equal_to(expected_elements), reify_windows=True)
     p.run()
 
-  # TODO(BEAM-4536): Reenable test when bug is fixed.
   @mock.patch('google.cloud.pubsub')
-  def _test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
-    payload = 'payload'
+  def test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
+    data = 'data'
     message_id = 'message_id'
     attributes = {'time': '1337'}
     publish_time = '2018-03-12T13:37:01.234567Z'
-    data = [create_client_message(
-        payload, message_id, attributes, publish_time)]
+    payloads = [
+        create_client_message(data, message_id, attributes, publish_time)]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -439,17 +559,16 @@ def _test_read_messages_timestamp_attribute_fail_missing(self, mock_pubsub):
     with self.assertRaisesRegexp(KeyError, r'Timestamp.*nonexistent'):
       p.run()
 
-  # TODO(BEAM-4536): Reenable test when bug is fixed.
   @mock.patch('google.cloud.pubsub')
-  def _test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
-    payload = 'payload'
+  def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
+    data = 'data'
     message_id = 'message_id'
     attributes = {'time': '1337 unparseable'}
     publish_time = '2018-03-12T13:37:01.234567Z'
-    data = [create_client_message(
-        payload, message_id, attributes, publish_time)]
+    payloads = [
+        create_client_message(data, message_id, attributes, publish_time)]
 
-    mock_pubsub.Client = functools.partial(FakePubsubClient, data)
+    mock_pubsub.Client = functools.partial(FakePubsubClient, payloads)
     mock_pubsub.subscription.AutoAck = FakeAutoAck
 
     p = TestPipeline()
@@ -462,6 +581,79 @@ def _test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
       p.run()
 
 
+@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
+class TestWriteToPubSub(unittest.TestCase):
+
+  @mock.patch('google.cloud.pubsub')
+  def test_write_messages_success(self, mock_pubsub):
+    data = 'data'
+    payloads = [data]
+    expected_payloads = [[data, {}]]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient,
+                                           messages_write=expected_payloads)
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    _ = (p
+         | Create(payloads)
+         | WriteToPubSub('projects/fakeprj/topics/a_topic',
+                         with_attributes=False))
+    p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_write_messages_deprecated(self, mock_pubsub):
+    data = 'data'
+    payloads = [data]
+    expected_payloads = [[data, {}]]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient,
+                                           messages_write=expected_payloads)
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    _ = (p
+         | Create(payloads)
+         | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
+    p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_write_messages_with_attributes_success(self, mock_pubsub):
+    data = 'data'
+    attributes = {'key': 'value'}
+    payloads = [PubsubMessage(data, attributes)]
+    expected_payloads = [[data, attributes]]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient,
+                                           messages_write=expected_payloads)
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    _ = (p
+         | Create(payloads)
+         | WriteToPubSub('projects/fakeprj/topics/a_topic',
+                         with_attributes=True))
+    p.run()
+
+  @mock.patch('google.cloud.pubsub')
+  def test_write_messages_with_attributes_error(self, mock_pubsub):
+    data = 'data'
+    # Sending raw data when WriteToPubSub expects a PubsubMessage object.
+    payloads = [data]
+
+    mock_pubsub.Client = functools.partial(FakePubsubClient)
+
+    p = TestPipeline()
+    p.options.view_as(StandardOptions).streaming = True
+    _ = (p
+         | Create(payloads)
+         | WriteToPubSub('projects/fakeprj/topics/a_topic',
+                         with_attributes=True))
+    with self.assertRaisesRegexp(AttributeError,
+                                 r'str.*has no attribute.*data'):
+      p.run()
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
index da906c6c6fe..9890dcd92e1 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher.py
@@ -25,6 +25,8 @@
 
 from hamcrest.core.base_matcher import BaseMatcher
 
+from apache_beam.io.gcp.pubsub import PubsubMessage
+
 __all__ = ['PubSubMessageMatcher']
 
 
@@ -47,15 +49,22 @@ class PubSubMessageMatcher(BaseMatcher):
   subscription until all expected messages are shown or timeout.
   """
 
-  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
+  def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT,
+               with_attributes=False, strip_attributes=None):
     """Initialize PubSubMessageMatcher object.
 
     Args:
       project: A name string of project.
       sub_name: A name string of subscription which is attached to output.
       expected_msg: A string list that contains expected message data pulled
-        from the subscription.
+        from the subscription. See also: with_attributes.
       timeout: Timeout in seconds to wait for all expected messages appears.
+      with_attributes: Whether expected_msg is a list of
+        ``PubsubMessage`` objects.
+      strip_attributes: List of strings. If with_attributes==True, strip the
+        attributes keyed by these values from incoming messages.
+        If a key is missing, will add an attribute with an error message as
+        value to prevent a successful match.
     """
     if pubsub is None:
       raise ImportError(
@@ -72,6 +81,8 @@ def __init__(self, project, sub_name, expected_msg, timeout=DEFAULT_TIMEOUT):
     self.expected_msg = expected_msg
     self.timeout = timeout
     self.messages = None
+    self.with_attributes = with_attributes
+    self.strip_attributes = strip_attributes
 
   def _matches(self, _):
     if self.messages is None:
@@ -91,7 +102,20 @@ def _wait_for_messages(self, subscription, expected_num, timeout):
     while time.time() - start_time <= timeout:
       pulled = subscription.pull(max_messages=MAX_MESSAGES_IN_ONE_PULL)
       for ack_id, message in pulled:
-        total_messages.append(message.data)
+        if not self.with_attributes:
+          total_messages.append(message.data)
+          continue
+
+        msg = PubsubMessage._from_message(message)
+        if self.strip_attributes:
+          for attr in self.strip_attributes:
+            try:
+              del msg.attributes[attr]
+            except KeyError:
+              msg.attributes[attr] = ('PubSubMessageMatcher error: '
+                                      'expected attribute not found.')
+        total_messages.append(msg)
+
         subscription.acknowledge([ack_id])
       if len(total_messages) >= expected_num:
         return total_messages
@@ -108,7 +132,13 @@ def describe_to(self, description):
   def describe_mismatch(self, _, mismatch_description):
     c_expected = Counter(self.expected_msg)
     c_actual = Counter(self.messages)
-    diff = (c_expected | c_actual) - (c_expected & c_actual)
     mismatch_description.append_text(
-        "Got %d messages. Diffs: %s." %
-        (len(self.messages), list(diff.elements())))
+        "Got %d messages. "
+        "Diffs (item, count):\n"
+        "  Expected but not in actual: %s\n"
+        "  Unexpected: %s" % (
+            len(self.messages), (c_expected - c_actual).items(),
+            (c_actual - c_expected).items()))
+    if self.with_attributes and self.strip_attributes:
+      mismatch_description.append_text(
+          '\n  Stripped attributes: %r' % self.strip_attributes)
diff --git a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
index 6bb780cb714..83f21a2a589 100644
--- a/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
+++ b/sdks/python/apache_beam/io/gcp/tests/pubsub_matcher_test.py
@@ -23,6 +23,7 @@
 import mock
 from hamcrest import assert_that as hc_assert_that
 
+from apache_beam.io.gcp.pubsub import PubsubMessage
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
 
 # Protect against environments where pubsub library is not available.
@@ -39,14 +40,17 @@ class PubSubMatcherTest(unittest.TestCase):
 
   def setUp(self):
     self.mock_presult = mock.MagicMock()
-    self.pubsub_matcher = PubSubMessageMatcher('mock_project',
-                                               'mock_sub_name',
-                                               ['mock_expected_msg'])
+
+  def init_matcher(self, with_attributes=False, strip_attributes=None):
+    self.pubsub_matcher = PubSubMessageMatcher(
+        'mock_project', 'mock_sub_name', ['mock_expected_msg'],
+        with_attributes=with_attributes, strip_attributes=strip_attributes)
 
   @mock.patch('time.sleep', return_value=None)
   @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
               'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_success(self, mock_get_sub, unsued_mock):
+    self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a', 'b']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.side_effect = [
@@ -56,10 +60,71 @@ def test_message_matcher_success(self, mock_get_sub, unsued_mock):
     hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertEqual(mock_sub.pull.call_count, 2)
 
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
+              'PubSubMessageMatcher._get_subscription')
+  def test_message_matcher_attributes_success(self, mock_get_sub, unsued_mock):
+    self.init_matcher(with_attributes=True)
+    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+    mock_sub = mock_get_sub.return_value
+    msg_a = pubsub.message.Message(b'a', 'unused_id')
+    msg_a.attributes['k'] = 'v'
+    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertEqual(mock_sub.pull.call_count, 1)
+
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
+              'PubSubMessageMatcher._get_subscription')
+  def test_message_matcher_attributes_fail(self, mock_get_sub, unsued_mock):
+    self.init_matcher(with_attributes=True)
+    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {})]
+    mock_sub = mock_get_sub.return_value
+    msg_a = pubsub.message.Message(b'a', 'unused_id')
+    msg_a.attributes['k'] = 'v'  # Unexpected.
+    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    with self.assertRaisesRegexp(AssertionError, r'Unexpected'):
+      hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertEqual(mock_sub.pull.call_count, 1)
+
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
+              'PubSubMessageMatcher._get_subscription')
+  def test_message_matcher_strip_success(self, mock_get_sub, unsued_mock):
+    self.init_matcher(with_attributes=True,
+                      strip_attributes=['id', 'timestamp'])
+    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+    mock_sub = mock_get_sub.return_value
+    msg_a = pubsub.message.Message(b'a', 'unused_id')
+    msg_a.attributes['id'] = 'foo'
+    msg_a.attributes['timestamp'] = 'bar'
+    msg_a.attributes['k'] = 'v'
+    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertEqual(mock_sub.pull.call_count, 1)
+
+  @mock.patch('time.sleep', return_value=None)
+  @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
+              'PubSubMessageMatcher._get_subscription')
+  def test_message_matcher_strip_fail(self, mock_get_sub, unsued_mock):
+    self.init_matcher(with_attributes=True,
+                      strip_attributes=['id', 'timestamp'])
+    self.pubsub_matcher.expected_msg = [PubsubMessage('a', {'k': 'v'})]
+    mock_sub = mock_get_sub.return_value
+    # msg_a is missing attribute 'timestamp'.
+    msg_a = pubsub.message.Message(b'a', 'unused_id')
+    msg_a.attributes['id'] = 'foo'
+    msg_a.attributes['k'] = 'v'
+    mock_sub.pull.side_effect = [[(1, msg_a)]]
+    with self.assertRaisesRegexp(AssertionError, r'Stripped attributes'):
+      hc_assert_that(self.mock_presult, self.pubsub_matcher)
+    self.assertEqual(mock_sub.pull.call_count, 1)
+
   @mock.patch('time.sleep', return_value=None)
   @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
               'PubSubMessageMatcher._get_subscription')
   def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
+    self.init_matcher()
     self.pubsub_matcher.expected_msg = ['a']
     mock_sub = mock_get_sub.return_value
     mock_sub.pull.return_value = [
@@ -77,16 +142,14 @@ def test_message_matcher_mismatch(self, mock_get_sub, unused_mock):
   @mock.patch('time.sleep', return_value=None)
   @mock.patch('apache_beam.io.gcp.tests.pubsub_matcher.'
               'PubSubMessageMatcher._get_subscription')
-  def test_message_metcher_timeout(self, mock_get_sub, unused_mock):
+  def test_message_matcher_timeout(self, mock_get_sub, unused_mock):
+    self.init_matcher()
     mock_sub = mock_get_sub.return_value
     mock_sub.return_value.full_name.return_value = 'mock_sub'
     self.pubsub_matcher.timeout = 0.1
-    with self.assertRaises(AssertionError) as error:
+    with self.assertRaisesRegexp(AssertionError, r'Expected 1.*\n.*Got 0'):
       hc_assert_that(self.mock_presult, self.pubsub_matcher)
     self.assertTrue(mock_sub.pull.called)
-    self.assertEqual(
-        '\nExpected: Expected %d messages.\n     but: Got %d messages. Diffs: '
-        '%s.\n' % (1, 0, ['mock_expected_msg']), str(error.exception.args[0]))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 098d47e880a..87982a1c9f1 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -841,7 +841,7 @@ def run_Read(self, transform_node):
                           transform.source.id_label)
       if transform.source.with_attributes:
         # Setting this property signals Dataflow runner to return full
-        # PubsubMessages instead of just the payload.
+        # PubsubMessages instead of just the data part of the payload.
         step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
       if transform.source.timestamp_attribute is not None:
         step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
@@ -925,9 +925,19 @@ def run__NativeWrite(self, transform_node):
       standard_options = (
           transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
       if not standard_options.streaming:
-        raise ValueError('PubSubPayloadSink is currently available for use '
+        raise ValueError('Cloud Pub/Sub is currently available for use '
                          'only in streaming pipelines.')
       step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic)
+      if transform.sink.id_label:
+        step.add_property(PropertyNames.PUBSUB_ID_LABEL,
+                          transform.sink.id_label)
+      if transform.sink.with_attributes:
+        # Setting this property signals Dataflow runner that the PCollection
+        # contains PubsubMessage objects instead of just raw data.
+        step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '')
+      if transform.sink.timestamp_attribute is not None:
+        step.add_property(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE,
+                          transform.sink.timestamp_attribute)
     else:
       raise ValueError(
           'Sink %r has unexpected format %s.' % (
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index bea54ffe1aa..0ca2f0d2fcc 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -44,6 +44,7 @@
 from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.core import CombinePerKey
 from apache_beam.transforms.core import CombineValuesDoFn
+from apache_beam.transforms.core import DoFn
 from apache_beam.transforms.core import ParDo
 from apache_beam.transforms.core import _GroupAlsoByWindow
 from apache_beam.transforms.core import _GroupAlsoByWindowDoFn
@@ -173,22 +174,6 @@ def from_runner_api_parameter(payload, context):
         context.windowing_strategies.get_by_id(payload.value))
 
 
-class _DirectReadFromPubSub(PTransform):
-  def __init__(self, source):
-    self._source = source
-
-  def _infer_output_coder(self, unused_input_type=None,
-                          unused_input_coder=None):
-    return coders.BytesCoder()
-
-  def get_windowing(self, inputs):
-    return beam.Windowing(beam.window.GlobalWindows())
-
-  def expand(self, pvalue):
-    # This is handled as a native transform.
-    return PCollection(self.pipeline)
-
-
 def _get_transform_overrides(pipeline_options):
   # A list of PTransformOverride objects to be applied before running a pipeline
   # using DirectRunner.
@@ -259,8 +244,67 @@ def get_replacement_transform(self, transform):
   return overrides
 
 
+class _DirectReadFromPubSub(PTransform):
+  def __init__(self, source):
+    self._source = source
+
+  def _infer_output_coder(self, unused_input_type=None,
+                          unused_input_coder=None):
+    return coders.BytesCoder()
+
+  def get_windowing(self, inputs):
+    return beam.Windowing(beam.window.GlobalWindows())
+
+  def expand(self, pvalue):
+    # This is handled as a native transform.
+    return PCollection(self.pipeline)
+
+
+class _DirectWriteToPubSubFn(DoFn):
+  _topic = None
+
+  def __init__(self, sink):
+    self.project = sink.project
+    self.topic_name = sink.topic_name
+    self.id_label = sink.id_label
+    self.timestamp_attribute = sink.timestamp_attribute
+    self.with_attributes = sink.with_attributes
+
+    # TODO(BEAM-4275): Add support for id_label and timestamp_attribute.
+    if sink.id_label:
+      raise NotImplementedError('id_label is not supported in Direct Runner')
+    if sink.timestamp_attribute:
+      raise NotImplementedError('timestamp_attribute is not supported in Direct'
+                                ' Runner')
+
+  def start_bundle(self):
+    from google.cloud import pubsub
+
+    if self._topic is None:
+      self._topic = pubsub.Client(project=self.project).topic(
+          self.topic_name)
+    self._buffer = []
+
+  def process(self, elem):
+    self._buffer.append(elem)
+    if len(self._buffer) >= 100:
+      self._flush()
+
+  def finish_bundle(self):
+    self._flush()
+
+  def _flush(self):
+    if self._buffer:
+      with self._topic.batch() as batch:
+        for elem in self._buffer:
+          if self.with_attributes:
+            batch.publish(elem.data, **elem.attributes)
+          else:
+            batch.publish(elem)
+      self._buffer = []
+
+
 def _get_pubsub_transform_overrides(pipeline_options):
-  from google.cloud import pubsub
   from apache_beam.io.gcp import pubsub as beam_pubsub
   from apache_beam.pipeline import PTransformOverride
 
@@ -275,49 +319,19 @@ def get_replacement_transform(self, transform):
                         '(use the --streaming flag).')
       return _DirectReadFromPubSub(transform._source)
 
-  class WriteStringsToPubSubOverride(PTransformOverride):
+  class WriteToPubSubOverride(PTransformOverride):
     def matches(self, applied_ptransform):
-      return isinstance(applied_ptransform.transform,
-                        beam_pubsub.WriteStringsToPubSub)
+      return isinstance(
+          applied_ptransform.transform,
+          (beam_pubsub.WriteToPubSub, beam_pubsub._WriteStringsToPubSub))
 
     def get_replacement_transform(self, transform):
       if not pipeline_options.view_as(StandardOptions).streaming:
         raise Exception('PubSub I/O is only available in streaming mode '
                         '(use the --streaming flag).')
+      return beam.ParDo(_DirectWriteToPubSubFn(transform._sink))
 
-      class _DirectWriteToPubSub(beam.DoFn):
-        _topic = None
-
-        def __init__(self, project, topic_name):
-          self.project = project
-          self.topic_name = topic_name
-
-        def start_bundle(self):
-          if self._topic is None:
-            self._topic = pubsub.Client(project=self.project).topic(
-                self.topic_name)
-          self._buffer = []
-
-        def process(self, elem):
-          self._buffer.append(elem.encode('utf-8'))
-          if len(self._buffer) >= 100:
-            self._flush()
-
-        def finish_bundle(self):
-          self._flush()
-
-        def _flush(self):
-          if self._buffer:
-            with self._topic.batch() as batch:
-              for datum in self._buffer:
-                batch.publish(datum)
-            self._buffer = []
-
-      project = transform._sink.project
-      topic_name = transform._sink.topic_name
-      return beam.ParDo(_DirectWriteToPubSub(project, topic_name))
-
-  return [ReadFromPubSubOverride(), WriteStringsToPubSubOverride()]
+  return [ReadFromPubSubOverride(), WriteToPubSubOverride()]
 
 
 class BundleBasedDirectRunner(PipelineRunner):
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 38381fa5fd2..308c11769be 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -460,7 +460,7 @@ def finish_bundle(self):
         if self.source.with_attributes:
           element = message
         else:
-          element = message.payload
+          element = message.data
         bundle.output(
             GlobalWindows.windowed_value(element, timestamp=timestamp))
       bundles = [bundle]
diff --git a/sdks/python/apache_beam/utils/annotations.py b/sdks/python/apache_beam/utils/annotations.py
index be1ded71f50..7ec7150d174 100644
--- a/sdks/python/apache_beam/utils/annotations.py
+++ b/sdks/python/apache_beam/utils/annotations.py
@@ -108,7 +108,7 @@ def inner(*args, **kwargs):
         message += ' since %s' % since
       message += '. Use %s instead.' % current if current else '.'
       if extra_message:
-        message += '. ' + extra_message
+        message += ' ' + extra_message
       warnings.warn(message, warning_type, stacklevel=2)
       return fnc(*args, **kwargs)
     return inner


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 127934)
    Time Spent: 10.5h  (was: 10h 20m)

> Support full PubsubMessages
> ---------------------------
>
>                 Key: BEAM-3744
>                 URL: https://issues.apache.org/jira/browse/BEAM-3744
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Critical
>          Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> Tracking changes to Pubsub support in Python SDK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message