beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-2020] Convert all unknown Coders into CustomCoder CloudObjects
Date Tue, 02 May 2017 20:45:06 GMT
Repository: beam
Updated Branches:
  refs/heads/master 3bd8a0f9f -> 0ce01b63f


[BEAM-2020] Convert all unknown Coders into CustomCoder CloudObjects

This ensures that all coders will be serializable, even if there is no
registered coder translator.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b523b68
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b523b68
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b523b68

Branch: refs/heads/master
Commit: 0b523b685cf5581f68b5318b7fa39550232625fe
Parents: 3bd8a0f
Author: Thomas Groh <tgroh@google.com>
Authored: Tue May 2 10:21:27 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue May 2 13:44:25 2017 -0700

----------------------------------------------------------------------
 .../dataflow/util/CloudObjectTranslators.java   | 11 ++++----
 .../runners/dataflow/util/CloudObjects.java     |  5 +---
 ...aultCoderCloudObjectTranslatorRegistrar.java |  2 +-
 .../runners/dataflow/util/CloudObjectsTest.java | 28 +++++++++++++++++++-
 4 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
index 7a95a9e..c27bee7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjectTranslators.java
@@ -313,10 +313,11 @@ class CloudObjectTranslators {
 
   private static final String CODER_FIELD = "serialized_coder";
   private static final String TYPE_FIELD = "type";
-  public static CloudObjectTranslator<? extends CustomCoder> custom() {
-    return new CloudObjectTranslator<CustomCoder>() {
+  public static CloudObjectTranslator<Coder> javaSerialized() {
+    return new CloudObjectTranslator<Coder>() {
       @Override
-      public CloudObject toCloudObject(CustomCoder target) {
+      public CloudObject toCloudObject(Coder target) {
+        // CustomCoder is used as the "marker" for a java-serialized coder
         CloudObject cloudObject = CloudObject.forClass(CustomCoder.class);
         Structs.addString(cloudObject, TYPE_FIELD, target.getClass().getName());
         Structs.addString(
@@ -327,10 +328,10 @@ class CloudObjectTranslators {
       }
 
       @Override
-      public CustomCoder fromCloudObject(CloudObject cloudObject) {
+      public Coder fromCloudObject(CloudObject cloudObject) {
         String serializedCoder = Structs.getString(cloudObject, CODER_FIELD);
         String type = Structs.getString(cloudObject, TYPE_FIELD);
-        return (CustomCoder<?>)
+        return (Coder<?>)
             SerializableUtils.deserializeFromByteArray(
                 StringUtils.jsonStringToByteArray(serializedCoder), type);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
index a55d10c..9383c48 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/CloudObjects.java
@@ -67,7 +67,7 @@ public class CloudObjects {
         (CloudObjectTranslator<Coder>) CODER_TRANSLATORS.get(coder.getClass());
     if (translator != null) {
       return translator.toCloudObject(coder);
-    } else if (coder instanceof CustomCoder) {
+    } else {
       CloudObjectTranslator customCoderTranslator = CODER_TRANSLATORS.get(CustomCoder.class);
       checkNotNull(
           customCoderTranslator,
@@ -77,9 +77,6 @@ public class CloudObjects {
           DefaultCoderCloudObjectTranslatorRegistrar.class.getSimpleName());
       return customCoderTranslator.toCloudObject(coder);
     }
-    throw new IllegalArgumentException(
-        String.format(
-            "Non-Custom %s with no registered %s", Coder.class, CloudObjectTranslator.class));
   }
 
   public static Coder<?> coderFromCloudObject(CloudObject cloudObject) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 29f047f..5cae13f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -67,7 +67,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           CloudObjectTranslators.windowedValue(),
           new AvroCoderCloudObjectTranslator(),
           new SerializableCoderCloudObjectTranslator(),
-          CloudObjectTranslators.custom());
+          CloudObjectTranslators.javaSerialized());
   @VisibleForTesting
   static final ImmutableSet<Class<? extends Coder>> KNOWN_ATOMIC_CODERS =
       ImmutableSet.<Class<? extends Coder>>of(

http://git-wip-us.apache.org/repos/asf/beam/blob/0b523b68/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
index a6a3f25..b670268 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/CloudObjectsTest.java
@@ -29,7 +29,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -40,6 +42,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -70,7 +73,7 @@ public class CloudObjectsTest {
           new DefaultCoderCloudObjectTranslatorRegistrar().classesToTranslators().keySet();
       Set<Class<? extends Coder>> testedClasses = new HashSet<>();
       for (Coder<?> tested : DefaultCoders.data()) {
-        if (tested instanceof ObjectCoder) {
+        if (tested instanceof ObjectCoder || tested instanceof ArbitraryCoder) {
           testedClasses.add(CustomCoder.class);
           assertThat(defaultCoderTranslators, hasItem(CustomCoder.class));
         } else {
@@ -103,6 +106,7 @@ public class CloudObjectsTest {
     public static Iterable<Coder<?>> data() {
       Builder<Coder<?>> dataBuilder =
           ImmutableList.<Coder<?>>builder()
+              .add(new ArbitraryCoder())
               .add(new ObjectCoder())
               .add(GlobalWindow.Coder.INSTANCE)
               .add(IntervalWindow.getCoder())
@@ -161,4 +165,26 @@ public class CloudObjectsTest {
       return getClass().hashCode();
     }
   }
+
+  /**
+   * A non-custom coder with no registered translator.
+   */
+  private static class ArbitraryCoder extends StructuredCoder<Record> {
+    @Override
+    public void encode(Record value, OutputStream outStream, Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(InputStream inStream, Context context) throws CoderException, IOException
{
+      return new Record();
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {}
+  }
 }


Mime
View raw message