beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/3] beam git commit: Provides a default coder for PubsubMessage
Date Thu, 04 May 2017 02:19:28 GMT
Provides a default coder for PubsubMessage


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e57b5013
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e57b5013
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e57b5013

Branch: refs/heads/master
Commit: e57b5013bc2d36602d24b8aba98ba1d28ec04933
Parents: 5b0a868
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Wed May 3 18:16:47 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed May 3 19:18:46 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/pubsub/PubsubCoderRegistrar.java | 36 ++++++++++++++++++++
 .../PubsubMessageWithAttributesCoder.java       |  5 +++
 .../io/gcp/pubsub/PubsubUnboundedSinkTest.java  |  3 --
 3 files changed, 41 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e57b5013/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
new file mode 100644
index 0000000..5944305
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubCoderRegistrar.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.CoderFactories;
+import org.apache.beam.sdk.coders.CoderFactory;
+import org.apache.beam.sdk.coders.CoderRegistrar;
+
+/** A {@link CoderRegistrar} for standard types used with {@link PubsubIO}. */
+@AutoService(CoderRegistrar.class)
+public class PubsubCoderRegistrar implements CoderRegistrar {
+  @Override
+  public Map<Class<?>, CoderFactory> getCoderFactoriesToUseForClasses() {
+    return ImmutableMap.<Class<?>, CoderFactory>of(
+        PubsubIO.PubsubMessage.class,
+        CoderFactories.forCoder(PubsubMessageWithAttributesCoder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/e57b5013/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index 27f0f02..be9493c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /** A coder for PubsubMessage including attributes. */
 public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.PubsubMessage>
{
@@ -35,6 +36,10 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubIO.Pubsu
   private static final Coder<Map<String, String>> ATTRIBUTES_CODER = MapCoder.of(
       StringUtf8Coder.of(), StringUtf8Coder.of());
 
+  public static Coder<PubsubIO.PubsubMessage> of(TypeDescriptor<PubsubIO.PubsubMessage>
ignored) {
+    return of();
+  }
+
   public static PubsubMessageWithAttributesCoder of() {
     return new PubsubMessageWithAttributesCoder();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/e57b5013/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
index 11e7d83..f2f40bb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java
@@ -115,7 +115,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
               RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(ImmutableList.of(DATA)))
        .apply(ParDo.of(new Stamp(ATTRIBUTES)))
-       .setCoder(PubsubMessageWithAttributesCoder.of())
        .apply(sink);
       p.run();
     }
@@ -145,7 +144,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
               Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
-       .setCoder(PubsubMessagePayloadOnlyCoder.of())
        .apply(sink);
       p.run();
     }
@@ -182,7 +180,6 @@ public class PubsubUnboundedSinkTest implements Serializable {
               RecordIdMethod.DETERMINISTIC);
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
-       .setCoder(PubsubMessagePayloadOnlyCoder.of())
        .apply(sink);
       p.run();
     }


Mime
View raw message