beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [43/50] [abbrv] beam git commit: Moves PubsubMessage to upper level and renames payload
Date Thu, 04 May 2017 07:17:49 GMT
Moves PubsubMessage to upper level and renames payload


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/210e216d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/210e216d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/210e216d

Branch: refs/heads/DSL_SQL
Commit: 210e216d95b14846cb51c94948a1c06157154de6
Parents: e57b501
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Wed May 3 18:17:34 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed May 3 19:18:46 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 22 +++----
 .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java |  3 +-
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 49 ++--------------
 .../beam/sdk/io/gcp/pubsub/PubsubMessage.java   | 61 ++++++++++++++++++++
 .../pubsub/PubsubMessagePayloadOnlyCoder.java   | 10 ++--
 .../PubsubMessageWithAttributesCoder.java       | 12 ++--
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  | 12 ++--
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 16 ++---
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  |  4 +-
 .../gcp/pubsub/PubsubUnboundedSourceTest.java   |  4 +-
 10 files changed, 107 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f7455b3..9e5a2fb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -89,7 +89,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
@@ -867,7 +867,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
    * instead defer to Windmill's implementation.
    */
   private static class StreamingPubsubIORead
-      extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>> {
+      extends PTransform<PBegin, PCollection<PubsubMessage>> {
     private final PubsubUnboundedSource transform;
 
     /**
@@ -883,8 +883,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     }
 
     @Override
-    public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) {
-      return PCollection.<PubsubIO.PubsubMessage>createPrimitiveOutputInternal(
+    public PCollection<PubsubMessage> expand(PBegin input) {
+      return PCollection.<PubsubMessage>createPrimitiveOutputInternal(
           input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
           .setCoder(new PubsubMessageWithAttributesCoder());
     }
@@ -956,9 +956,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
   }
 
   private static class IdentityMessageFn
-      extends SimpleFunction<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage> {
+      extends SimpleFunction<PubsubMessage, PubsubMessage> {
     @Override
-    public PubsubIO.PubsubMessage apply(PubsubIO.PubsubMessage input) {
+    public PubsubMessage apply(PubsubMessage input) {
       return input;
     }
   }
@@ -968,7 +968,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
    * instead defer to Windmill's implementation.
    */
   private static class StreamingPubsubIOWrite
-      extends PTransform<PCollection<PubsubIO.PubsubMessage>, PDone> {
+      extends PTransform<PCollection<PubsubMessage>, PDone> {
     private final PubsubUnboundedSink transform;
 
     /**
@@ -984,7 +984,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     }
 
     @Override
-    public PDone expand(PCollection<PubsubIO.PubsubMessage> input) {
+    public PDone expand(PCollection<PubsubMessage> input) {
       return PDone.in(input.getPipeline());
     }
 
@@ -1332,7 +1332,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
 
   private class StreamingPubsubIOWriteOverrideFactory
       implements PTransformOverrideFactory<
-          PCollection<PubsubIO.PubsubMessage>, PDone, PubsubUnboundedSink> {
+          PCollection<PubsubMessage>, PDone, PubsubUnboundedSink> {
     private final DataflowRunner runner;
 
     private StreamingPubsubIOWriteOverrideFactory(DataflowRunner runner) {
@@ -1340,9 +1340,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     }
 
     @Override
-    public PTransformReplacement<PCollection<PubsubIO.PubsubMessage>, PDone>
+    public PTransformReplacement<PCollection<PubsubMessage>, PDone>
         getReplacementTransform(
-            AppliedPTransform<PCollection<PubsubIO.PubsubMessage>, PDone, PubsubUnboundedSink>
+            AppliedPTransform<PCollection<PubsubMessage>, PDone, PubsubUnboundedSink>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
index 5944305..062f350 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
@@ -30,7 +30,6 @@ public class PubsubCoderRegistrar implements CoderRegistrar {
   @Override
   public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
     return ImmutableMap.<Class<?>, CoderFactory>of(
-        PubsubIO.PubsubMessage.class,
-        CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of()));
+        PubsubMessage.class, CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index 133839c..e023ad0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsub;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
@@ -156,44 +155,6 @@ public class PubsubIO {
   }
 
   /**
-   * Class representing a Pub/Sub message. Each message contains a single message payload
and
-   * a map of attached attributes.
-   */
-  public static class PubsubMessage {
-
-    private byte[] message;
-    private Map<String, String> attributes;
-
-    public PubsubMessage(byte[] message, Map<String, String> attributes) {
-      this.message = message;
-      this.attributes = attributes;
-    }
-
-    /**
-     * Returns the main PubSub message.
-     */
-    public byte[] getMessage() {
-      return message;
-    }
-
-    /**
-     * Returns the given attribute value. If not such attribute exists, returns null.
-     */
-    @Nullable
-    public String getAttribute(String attribute) {
-      checkNotNull(attribute, "attribute");
-      return attributes.get(attribute);
-    }
-
-    /**
-     * Returns the full map of attributes. This is an unmodifiable map.
-     */
-    public Map<String, String> getAttributeMap() {
-      return attributes;
-    }
-  }
-
-  /**
    * Class representing a Cloud Pub/Sub Subscription.
    */
   public static class PubsubSubscription implements Serializable {
@@ -471,7 +432,7 @@ public class PubsubIO {
 
   /**
    * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream.
The
-   * messages will only contain a {@link PubsubMessage#getMessage() payload}, but no {@link
+   * messages will only contain a {@link PubsubMessage#getPayload() payload}, but no {@link
    * PubsubMessage#getAttributeMap() attributes}.
    */
   public static Read<PubsubMessage> readPubsubMessagesWithoutAttributes() {
@@ -484,7 +445,7 @@ public class PubsubIO {
 
   /**
    * Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream.
The
-   * messages will contain both a {@link PubsubMessage#getMessage() payload} and {@link
+   * messages will contain both a {@link PubsubMessage#getPayload() payload} and {@link
    * PubsubMessage#getAttributeMap() attributes}.
    */
   public static Read<PubsubMessage> readPubsubMessagesWithAttributes() {
@@ -939,7 +900,7 @@ public class PubsubIO {
       public void processElement(ProcessContext c) throws IOException {
         byte[] payload;
         PubsubMessage message = getFormatFn().apply(c.element());
-        payload = message.getMessage();
+        payload = message.getPayload();
         Map<String, String> attributes = message.getAttributeMap();
         // NOTE: The record id is always null.
         output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null));
@@ -981,7 +942,7 @@ public class PubsubIO {
   private static class ParsePayloadAsUtf8 extends SimpleFunction<PubsubMessage, String>
{
     @Override
     public String apply(PubsubMessage input) {
-      return new String(input.getMessage(), StandardCharsets.UTF_8);
+      return new String(input.getPayload(), StandardCharsets.UTF_8);
     }
   }
 
@@ -995,7 +956,7 @@ public class PubsubIO {
     @Override
     public T apply(PubsubMessage input) {
       try {
-        return CoderUtils.decodeFromByteArray(coder, input.getMessage());
+        return CoderUtils.decodeFromByteArray(coder, input.getPayload());
       } catch (CoderException e) {
         throw new RuntimeException("Could not decode Pubsub message", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
new file mode 100644
index 0000000..69f850a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+/**
+ * Class representing a Pub/Sub message. Each message contains a single message payload and
+ * a map of attached attributes.
+ */
+public class PubsubMessage {
+
+  private byte[] message;
+  private Map<String, String> attributes;
+
+  public PubsubMessage(byte[] payload, Map<String, String> attributes) {
+    this.message = payload;
+    this.attributes = attributes;
+  }
+
+  /**
+   * Returns the main PubSub message.
+   */
+  public byte[] getPayload() {
+    return message;
+  }
+
+  /**
+   * Returns the given attribute value. If not such attribute exists, returns null.
+   */
+  @Nullable
+  public String getAttribute(String attribute) {
+    checkNotNull(attribute, "attribute");
+    return attributes.get(attribute);
+  }
+
+  /**
+   * Returns the full map of attributes. This is an unmodifiable map.
+   */
+  public Map<String, String> getAttributeMap() {
+    return attributes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
index f0dae46..81c1a45 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -27,22 +27,22 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.StreamUtils;
 
 /** A coder for PubsubMessage treating the raw bytes being decoded as the message's payload.
*/
-public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubIO.PubsubMessage>
{
+public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
   public static PubsubMessagePayloadOnlyCoder of() {
     return new PubsubMessagePayloadOnlyCoder();
   }
 
   @Override
-  public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context)
+  public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
     checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
-    outStream.write(value.getMessage());
+    outStream.write(value.getPayload());
   }
 
   @Override
-  public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException
{
+  public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
     checkState(context.isWholeStream, "Expected to only be used in a whole-stream context");
-    return new PubsubIO.PubsubMessage(
+    return new PubsubMessage(
         StreamUtils.getBytes(inStream), ImmutableMap.<String, String>of());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index be9493c..f70955d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -30,13 +30,13 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** A coder for PubsubMessage including attributes. */
-public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage>
{
+public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage> {
   private static final Coder<byte[]> PAYLOAD_CODER =
       NullableCoder.of(ByteArrayCoder.of());
   private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of(
       StringUtf8Coder.of(), StringUtf8Coder.of());
 
-  public static Coder<PubsubIO.PubsubMessage> of(TypeDescriptor<PubsubIO.PubsubMessage>
ignored) {
+  public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored)
{
     return of();
   }
 
@@ -44,19 +44,19 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.Pubsu
     return new PubsubMessageWithAttributesCoder();
   }
 
-  public void encode(PubsubIO.PubsubMessage value, OutputStream outStream, Context context)
+  public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
     PAYLOAD_CODER.encode(
-        value.getMessage(),
+        value.getPayload(),
         outStream,
         context.nested());
     ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
   }
 
   @Override
-  public PubsubIO.PubsubMessage decode(InputStream inStream, Context context) throws IOException
{
+  public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
     byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested());
     Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);
-    return new PubsubIO.PubsubMessage(payload, attributes);
+    return new PubsubMessage(payload, attributes);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 67530ec..9d97e91 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -81,7 +81,7 @@ import org.joda.time.Duration;
  * to dedup messages.
  * </ul>
  */
-public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubMessage>,
PDone> {
+public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
PDone> {
   /**
    * Default maximum number of messages per publish.
    */
@@ -154,7 +154,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubM
   /**
    * Convert elements to messages and shard them.
    */
-  private static class ShardFn extends DoFn<PubsubIO.PubsubMessage, KV<Integer, OutgoingMessage>>
{
+  private static class ShardFn extends DoFn<PubsubMessage, KV<Integer, OutgoingMessage>>
{
     private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
@@ -167,8 +167,8 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubM
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       elementCounter.inc();
-      PubsubIO.PubsubMessage message = c.element();
-      byte[] elementBytes = message.getMessage();
+      PubsubMessage message = c.element();
+      byte[] elementBytes = message.getPayload();
       Map<String, String> attributes = message.getAttributeMap();
 
       long timestampMsSinceEpoch = c.timestamp().getMillis();
@@ -427,11 +427,11 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubIO.PubsubM
   }
 
   @Override
-  public PDone expand(PCollection<PubsubIO.PubsubMessage> input) {
+  public PDone expand(PCollection<PubsubMessage> input) {
     input
         .apply(
             "PubsubUnboundedSink.Window",
-            Window.<PubsubIO.PubsubMessage>into(new GlobalWindows())
+            Window.<PubsubMessage>into(new GlobalWindows())
                 .triggering(
                     Repeatedly.forever(
                         AfterFirst.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index d366949..e5be71b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -107,7 +107,7 @@ import org.slf4j.LoggerFactory;
  * {@link UnboundedSource.UnboundedReader} instances to execute concurrently and thus hide
latency.
  * </ul>
  */
-public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubIO.PubsubMessage>>
{
+public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<PubsubMessage>>
{
   private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
 
   /**
@@ -389,7 +389,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
    * but not yet consumed downstream and/or ACKed back to Pubsub.
    */
   @VisibleForTesting
-  static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubIO.PubsubMessage>
{
+  static class PubsubReader extends UnboundedSource.UnboundedReader<PubsubMessage>
{
     /**
      * For access to topic and checkpointCoder.
      */
@@ -963,11 +963,11 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
-    public PubsubIO.PubsubMessage getCurrent() throws NoSuchElementException {
+    public PubsubMessage getCurrent() throws NoSuchElementException {
       if (current == null) {
         throw new NoSuchElementException();
       }
-      return new PubsubIO.PubsubMessage(current.elementBytes, current.attributes);
+      return new PubsubMessage(current.elementBytes, current.attributes);
     }
 
     @Override
@@ -1088,7 +1088,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   // ================================================================================
 
   @VisibleForTesting
-  static class PubsubSource extends UnboundedSource<PubsubIO.PubsubMessage, PubsubCheckpoint>
{
+  static class PubsubSource extends UnboundedSource<PubsubMessage, PubsubCheckpoint>
{
     public final PubsubUnboundedSource outer;
     // The subscription to read from.
     @VisibleForTesting
@@ -1161,7 +1161,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
-    public Coder<PubsubIO.PubsubMessage> getDefaultOutputCoder() {
+    public Coder<PubsubMessage> getDefaultOutputCoder() {
       return new PubsubMessageWithAttributesCoder();
     }
 
@@ -1181,7 +1181,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   // StatsFn
   // ================================================================================
 
-  private static class StatsFn extends DoFn<PubsubIO.PubsubMessage, PubsubIO.PubsubMessage>
{
+  private static class StatsFn extends DoFn<PubsubMessage, PubsubMessage> {
     private final Counter elementCounter = SourceMetrics.elementsRead();
 
     private final PubsubClientFactory pubsubFactory;
@@ -1398,7 +1398,7 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
   }
 
   @Override
-  public PCollection<PubsubIO.PubsubMessage> expand(PBegin input) {
+  public PCollection<PubsubMessage> expand(PBegin input) {
     return input.getPipeline().begin()
                 .apply(Read.from(new PubsubSource(this)))
                 .apply("PubsubUnboundedSource.Stats",

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index f2f40bb..cc3c85e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -60,7 +60,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
   private static final String ID_ATTRIBUTE = "id";
   private static final int NUM_SHARDS = 10;
 
-  private static class Stamp extends DoFn<String, PubsubIO.PubsubMessage> {
+  private static class Stamp extends DoFn<String, PubsubMessage> {
     private final Map<String, String> attributes;
 
     private Stamp() {
@@ -74,7 +74,7 @@ public class PubsubUnboundedSinkTest implements Serializable {
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.outputWithTimestamp(
-          new PubsubIO.PubsubMessage(
+          new PubsubMessage(
               c.element().getBytes(StandardCharsets.UTF_8), attributes),
           new Instant(TIMESTAMP));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/210e216d/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index 592dfa3..ee467da 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -114,8 +114,8 @@ public class PubsubUnboundedSourceTest {
     factory = null;
   }
 
-  private static String data(PubsubIO.PubsubMessage message) {
-    return new String(message.getMessage(), StandardCharsets.UTF_8);
+  private static String data(PubsubMessage message) {
+    return new String(message.getPayload(), StandardCharsets.UTF_8);
   }
 
   @Test


Mime
View raw message