beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [3/3] beam git commit: Make Most StandardCoders CustomCoders
Date Tue, 25 Apr 2017 22:57:09 GMT
Make Most StandardCoders CustomCoders

Standard Coders have a defined serialization format and are understood
within the Runner API, Custom Coders are not. Move existing
"StandardCoders" to extend CustomCoder, and remove custom cloud object
related serialization logic where possible.

Still remaining: Splitting the CustomCoder side of the class hierarchy
from the StandardCoder side of the hierarchy, moving IterableLikeCoder
to be a CustomCoder, and have IterableCoder forward to an internal
implementation (to ensure it remains a StandardCoder).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbcec7c0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbcec7c0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbcec7c0

Branch: refs/heads/master
Commit: cbcec7c0520f151f348604621cc251b6ed9b1616
Parents: 2e3c17a
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Apr 21 17:58:51 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Tue Apr 25 15:31:33 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java |   6 +-
 runners/core-construction-java/pom.xml          |   5 -
 .../beam/runners/core/construction/Coders.java  |   6 +
 .../UnboundedReadFromBoundedSource.java         |  16 +--
 .../runners/core/construction/CodersTest.java   |   2 +
 runners/core-java/pom.xml                       |   5 -
 .../beam/runners/core/KeyedWorkItemCoder.java   |  22 +---
 .../beam/runners/core/TimerInternals.java       |  18 +--
 .../streaming/SingletonKeyedWorkItemCoder.java  |  22 +---
 .../runners/dataflow/BatchViewOverrides.java    |  21 +---
 .../runners/dataflow/internal/IsmFormat.java    |  64 ++--------
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  46 +++-----
 .../org/apache/beam/sdk/coders/CustomCoder.java |   3 +-
 .../org/apache/beam/sdk/coders/MapCoder.java    |  26 ++---
 .../apache/beam/sdk/coders/NullableCoder.java   |  27 +++--
 .../beam/sdk/coders/SerializableCoder.java      |  22 ----
 .../org/apache/beam/sdk/testing/TestStream.java | 116 -------------------
 .../org/apache/beam/sdk/transforms/Combine.java |  14 +--
 .../apache/beam/sdk/transforms/CombineFns.java  |  15 +--
 .../beam/sdk/transforms/join/CoGbkResult.java   |  27 +----
 .../beam/sdk/transforms/join/UnionCoder.java    |  14 +--
 .../apache/beam/sdk/util/ValueWithRecordId.java |  16 +--
 .../beam/sdk/values/TimestampedValue.java       |  22 +---
 .../beam/sdk/values/ValueInSingleWindow.java    |  23 +---
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  15 ++-
 .../beam/sdk/coders/CoderRegistryTest.java      |   2 +-
 .../beam/sdk/coders/DelegateCoderTest.java      |   2 +-
 .../beam/sdk/coders/LengthPrefixCoderTest.java  |   7 --
 .../beam/sdk/coders/NullableCoderTest.java      |   2 +-
 .../beam/sdk/coders/SerializableCoderTest.java  |  11 +-
 .../apache/beam/sdk/testing/TestStreamTest.java |  24 ----
 .../beam/sdk/transforms/CombineFnsTest.java     |   4 +-
 .../beam/sdk/util/SerializableUtilsTest.java    |  56 +--------
 .../sdk/extensions/protobuf/ProtoCoder.java     |  14 ---
 sdks/java/io/google-cloud-platform/pom.xml      |   5 -
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    |  17 +--
 sdks/java/io/hadoop-common/pom.xml              |   5 -
 .../beam/sdk/io/hadoop/WritableCoder.java       |  29 +----
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   |   4 +-
 39 files changed, 116 insertions(+), 639 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 79a4f1b..4ce351b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.apex.translation.utils;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.datatorrent.api.Operator;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -29,11 +28,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
  * The common interface for all objects transmitted through streams.
@@ -151,7 +149,7 @@ public interface ApexStreamTuple<T> {
   /**
    * Coder for {@link ApexStreamTuple}.
    */
-  class ApexStreamTupleCoder<T> extends StandardCoder<ApexStreamTuple<T>> {
+  class ApexStreamTupleCoder<T> extends CustomCoder<ApexStreamTuple<T>> {
     private static final long serialVersionUID = 1L;
     final Coder<T> valueCoder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index dfab3e2..854fdc1 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -71,11 +71,6 @@
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 043a010..6fe5dc9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -65,6 +66,7 @@ public class Coders {
           .put(VarLongCoder.class, "urn:beam:coders:varint:0.1")
           .put(IntervalWindowCoder.class, "urn:beam:coders:interval_window:0.1")
           .put(IterableCoder.class, "urn:beam:coders:stream:0.1")
+          .put(LengthPrefixCoder.class, "urn:beam:coders:length_prefix:0.1")
           .put(GlobalWindow.Coder.class, "urn:beam:coders:global_window:0.1")
           .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
           .build();
@@ -143,6 +145,10 @@ public class Coders {
         return VarLongCoder.of();
       case "urn:beam:coders:interval_window:0.1":
         return IntervalWindowCoder.of();
+      case "urn:beam:coders:length_prefix:0.1":
+        checkArgument(
+            coderComponents.size() == 1, "Expecting 1 component, got %s", coderComponents.size());
+        return LengthPrefixCoder.of(coderComponents.get(0));
       case "urn:beam:coders:stream:0.1":
         return IterableCoder.of(coderComponents);
       case "urn:beam:coders:global_window:0.1":

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 6b99522..0ea13b8 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -20,8 +20,6 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
@@ -37,10 +35,10 @@ import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.Read;
@@ -50,7 +48,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -206,16 +203,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @VisibleForTesting
-    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> {
-
-      @JsonCreator
-      public static CheckpointCoder<?> of(
-          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-          List<Coder<?>> components) {
-        checkArgument(components.size() == 1,
-            "Expecting 1 components, got %s", components.size());
-        return new CheckpointCoder<>(components.get(0));
-      }
+    static class CheckpointCoder<T> extends CustomCoder<Checkpoint<T>> {
 
       // The coder for a list of residual elements and their timestamps
       private final Coder<List<TimestampedValue<T>>> elemsCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index b2b9955..c9d32ee 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -66,6 +67,7 @@ public class CodersTest {
           .add(VarLongCoder.of())
           .add(IntervalWindowCoder.of())
           .add(IterableCoder.of(ByteArrayCoder.of()))
+          .add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of())))
           .add(GlobalWindow.Coder.INSTANCE)
           .add(
               FullWindowedValueCoder.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index f066abf..5aa8a82 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -85,11 +85,6 @@
     <!-- build dependencies -->
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.auto.value</groupId>
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index 7a144a6..fddf7fa 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,17 +26,16 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 
 /**
  * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
  */
-public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K, ElemT>> {
+public class KeyedWorkItemCoder<K, ElemT> extends CustomCoder<KeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.
@@ -50,19 +45,6 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
     return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
   }
 
-  @JsonCreator
-  public static <K, ElemT> KeyedWorkItemCoder<K, ElemT> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-    checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
-    @SuppressWarnings("unchecked")
-    Coder<K> keyCoder = (Coder<K>) components.get(0);
-    @SuppressWarnings("unchecked")
-    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
-    @SuppressWarnings("unchecked")
-    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
-    return new KeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
-  }
-
   private final Coder<K> keyCoder;
   private final Coder<ElemT> elemCoder;
   private final Coder<? extends BoundedWindow> windowCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 9c7bd57..375cdf9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ComparisonChain;
 import java.io.IOException;
@@ -31,12 +27,11 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
 
@@ -229,7 +224,7 @@ public interface TimerInternals {
   /**
    * A {@link Coder} for {@link TimerData}.
    */
-  class TimerDataCoder extends StandardCoder<TimerData> {
+  class TimerDataCoder extends CustomCoder<TimerData> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();
     private final Coder<? extends BoundedWindow> windowCoder;
@@ -238,15 +233,6 @@ public interface TimerInternals {
       return new TimerDataCoder(windowCoder);
     }
 
-    @SuppressWarnings("unchecked")
-    @JsonCreator
-    public static TimerDataCoder of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 components, got %s", components.size());
-      return of((Coder<? extends BoundedWindow>) components.get(0));
-    }
-
     private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) {
       this.windowCoder = windowCoder;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index 9a52330..c73700f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,16 +26,15 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * Singleton keyed work item coder.
  */
 public class SingletonKeyedWorkItemCoder<K, ElemT>
-    extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
+    extends CustomCoder<SingletonKeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.
@@ -49,19 +44,6 @@ public class SingletonKeyedWorkItemCoder<K, ElemT>
     return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
   }
 
-  @JsonCreator
-  public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-    checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
-    @SuppressWarnings("unchecked")
-    Coder<K> keyCoder = (Coder<K>) components.get(0);
-    @SuppressWarnings("unchecked")
-    Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
-    @SuppressWarnings("unchecked")
-    Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
-    return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
-  }
-
   private final Coder<K> keyCoder;
   private final Coder<ElemT> elemCoder;
   private final Coder<? extends BoundedWindow> windowCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index 1565fd1..64fe495 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -17,12 +17,9 @@
  */
 package org.apache.beam.runners.dataflow;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.collect.ForwardingMap;
@@ -53,11 +50,11 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -78,7 +75,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -1339,7 +1335,7 @@ class BatchViewOverrides {
    * A {@link Coder} for {@link TransformedMap}s.
    */
   static class TransformedMapCoder<K, V1, V2>
-      extends StandardCoder<TransformedMap<K, V1, V2>> {
+      extends CustomCoder<TransformedMap<K, V1, V2>> {
     private final Coder<Function<V1, V2>> transformCoder;
     private final Coder<Map<K, V1>> originalMapCoder;
 
@@ -1354,19 +1350,6 @@ class BatchViewOverrides {
       return new TransformedMapCoder<>(transformCoder, originalMapCoder);
     }
 
-    @JsonCreator
-    public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-            List<Coder<?>> components) {
-      checkArgument(components.size() == 2,
-          "Expecting 2 components, got " + components.size());
-      @SuppressWarnings("unchecked")
-      Coder<Function<V1, V2>> transformCoder = (Coder<Function<V1, V2>>) components.get(0);
-      @SuppressWarnings("unchecked")
-      Coder<Map<K, V1>> originalMapCoder = (Coder<Map<K, V1>>) components.get(1);
-      return of(transformCoder, originalMapCoder);
-    }
-
     @Override
     public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
         Coder.Context context) throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 97824dc..8b823bc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -20,10 +20,7 @@ package org.apache.beam.runners.dataflow.internal;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.util.Structs.addLong;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
 import com.google.common.hash.HashFunction;
@@ -43,11 +40,8 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -170,20 +164,20 @@ public class IsmFormat {
   /**
    * A {@link Coder} for {@link IsmRecord}s.
    *
-   * <p>Note that this coder standalone will not produce an Ism file. This coder can be used
-   * to materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder
-   * is combined with an {@code IsmSink} will one produce an Ism file.
+   * <p>Note that this coder standalone will not produce an Ism file. This coder can be used to
+   * materialize a {@link PCollection} of {@link IsmRecord}s. Only when this coder is combined with
+   * an {@code IsmSink} will one produce an Ism file.
    *
    * <p>The {@link IsmRecord} encoded format is:
+   *
    * <ul>
-   *   <li>encoded key component 1 using key component coder 1</li>
-   *   <li>...</li>
-   *   <li>encoded key component N using key component coder N</li>
-   *   <li>encoded value using value coder</li>
+   *   <li>encoded key component 1 using key component coder 1
+   *   <li>...
+   *   <li>encoded key component N using key component coder N
+   *   <li>encoded value using value coder
    * </ul>
    */
-  public static class IsmRecordCoder<V>
-      extends StandardCoder<IsmRecord<V>> {
+  public static class IsmRecordCoder<V> extends CustomCoder<IsmRecord<V>> {
     /** Returns an IsmRecordCoder with the specified key component coders, value coder. */
     public static <V> IsmRecordCoder<V> of(
         int numberOfShardKeyCoders,
@@ -202,24 +196,6 @@ public class IsmFormat {
           valueCoder);
     }
 
-    /**
-     * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
-     * to be called by users but used by Jackson when decoding this coder.
-     */
-    @JsonCreator
-    public static IsmRecordCoder<?> of(
-        @JsonProperty(PropertyNames.NUM_SHARD_CODERS) int numberOfShardCoders,
-        @JsonProperty(PropertyNames.NUM_METADATA_SHARD_CODERS) int numberOfMetadataShardCoders,
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-      checkArgument(components.size() >= 2,
-          "Expecting at least 2 components, got " + components.size());
-      return of(
-          numberOfShardCoders,
-          numberOfMetadataShardCoders,
-          components.subList(0, components.size() - 1),
-          components.get(components.size() - 1));
-    }
-
     private final int numberOfShardKeyCoders;
     private final int numberOfMetadataShardKeyCoders;
     private final List<Coder<?>> keyComponentCoders;
@@ -379,14 +355,6 @@ public class IsmFormat {
     }
 
     @Override
-    protected CloudObject initializeCloudObject() {
-      CloudObject result = CloudObject.forClass(getClass());
-      addLong(result, PropertyNames.NUM_SHARD_CODERS, numberOfShardKeyCoders);
-      addLong(result, PropertyNames.NUM_METADATA_SHARD_CODERS, numberOfMetadataShardKeyCoders);
-      return result;
-    }
-
-    @Override
     public void verifyDeterministic() throws Coder.NonDeterministicException {
       verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
       verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
@@ -482,24 +450,12 @@ public class IsmFormat {
    * A coder for metadata key component. Can be used to wrap key component coder allowing for
    * the metadata key component to be used as a place holder instead of an actual key.
    */
-  public static class MetadataKeyCoder<K> extends StandardCoder<K> {
+  public static class MetadataKeyCoder<K> extends CustomCoder<K> {
     public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
       checkNotNull(keyCoder);
       return new MetadataKeyCoder<>(keyCoder);
     }
 
-    /**
-     * Returns an IsmRecordCoder with the specified coders. Note that this method is not meant
-     * to be called by users but used by Jackson when decoding this coder.
-     */
-    @JsonCreator
-    public static MetadataKeyCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
-      checkArgument(components.size() == 1,
-          "Expecting one component, got " + components.size());
-      return of(components.get(0));
-    }
-
     private final Coder<K> keyCoder;
 
     private MetadataKeyCoder(Coder<K> keyCoder) {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 9316224..91822bf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -17,10 +17,7 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static org.apache.beam.sdk.util.Structs.addString;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import java.io.IOException;
 import java.io.InputStream;
@@ -57,7 +54,6 @@ import org.apache.avro.reflect.Union;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.util.ClassUtils;
 import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -102,7 +98,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of elements handled by this coder
  */
-public class AvroCoder<T> extends StandardCoder<T> {
+public class AvroCoder<T> extends CustomCoder<T> {
 
   /**
    * Returns an {@code AvroCoder} instance for the provided element type.
@@ -143,15 +139,6 @@ public class AvroCoder<T> extends StandardCoder<T> {
     return new AvroCoder<>(type, schema);
   }
 
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @JsonCreator
-  public static AvroCoder<?> of(
-      @JsonProperty("type") String classType,
-      @JsonProperty("schema") String schema) throws ClassNotFoundException {
-    Schema.Parser parser = new Schema.Parser();
-    return new AvroCoder(Class.forName(classType), parser.parse(schema));
-  }
-
   public static final CoderProvider PROVIDER = new CoderProvider() {
     @Override
     public <T> Coder<T> getCoder(TypeDescriptor<T> typeDescriptor) {
@@ -234,17 +221,20 @@ public class AvroCoder<T> extends StandardCoder<T> {
     // Reader and writer are allocated once per thread and are "final" for thread-local Coder
     // instance.
     this.reader = new EmptyOnDeserializationThreadLocal<DatumReader<T>>() {
+      private final AvroCoder<T> myCoder = AvroCoder.this;
       @Override
       public DatumReader<T> initialValue() {
-        return createDatumReader();
-      }
-    };
-    this.writer = new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
-      @Override
-      public DatumWriter<T> initialValue() {
-        return createDatumWriter();
+        return myCoder.createDatumReader();
       }
     };
+    this.writer =
+        new EmptyOnDeserializationThreadLocal<DatumWriter<T>>() {
+          private final AvroCoder<T> myCoder = AvroCoder.this;
+          @Override
+          public DatumWriter<T> initialValue() {
+            return myCoder.createDatumWriter();
+          }
+        };
   }
 
   /**
@@ -311,14 +301,6 @@ public class AvroCoder<T> extends StandardCoder<T> {
     return null;
   }
 
-  @Override
-  protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    addString(result, "type", type.getName());
-    addString(result, "schema", schemaSupplier.get().toString());
-    return result;
-  }
-
   /**
    * @throws NonDeterministicException when the type may not be deterministically
    * encoded using the given {@link Schema}, the {@code directBinaryEncoder}, and the
@@ -339,6 +321,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
    */
   // TODO: once we can remove this deprecated function, inline in constructor.
   @Deprecated
+  @VisibleForTesting
   public DatumReader<T> createDatumReader() {
     if (type.equals(GenericRecord.class)) {
       return new GenericDatumReader<>(schemaSupplier.get());
@@ -352,8 +335,9 @@ public class AvroCoder<T> extends StandardCoder<T> {
    *
    * @deprecated For {@code AvroCoder} internal use only.
    */
-  // TODO: once we can remove this deprecated function, inline in constructor.
   @Deprecated
+  @VisibleForTesting
+  // TODO: once we can remove this deprecated function, inline in constructor.
   public DatumWriter<T> createDatumWriter() {
     if (type.equals(GenericRecord.class)) {
       return new GenericDatumWriter<>(schemaSupplier.get());

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index fbf65df..2262e13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.util.StringUtils;
 public abstract class CustomCoder<T> extends StandardCoder<T>
     implements Serializable {
   @JsonCreator
+  @Deprecated
   public static CustomCoder<?> of(
       // N.B. typeId is a required parameter here, since a field named "@type"
       // is presented to the deserializer as an input.
@@ -87,7 +88,7 @@ public abstract class CustomCoder<T> extends StandardCoder<T>
    * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.
    */
   @Override
-  public CloudObject initializeCloudObject() {
+  public final CloudObject initializeCloudObject() {
     // N.B. We use the CustomCoder class, not the derived class, since during
     // deserialization we will be using the CustomCoder's static factory method
     // to construct an instance of the derived class.

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 7918528..74cd602 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Maps;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -28,12 +24,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeParameter;
@@ -45,7 +41,7 @@ import org.apache.beam.sdk.values.TypeParameter;
  * @param <K> the type of the keys of the KVs being transcoded
  * @param <V> the type of the values of the KVs being transcoded
  */
-public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
+public class MapCoder<K, V> extends CustomCoder<Map<K, V>> {
   /**
    * Produces a MapCoder with the given keyCoder and valueCoder.
    */
@@ -55,14 +51,6 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
     return new MapCoder<>(keyCoder, valueCoder);
   }
 
-  @JsonCreator
-  public static MapCoder<?, ?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
-    return of((Coder<?>) components.get(0), (Coder<?>) components.get(1));
-  }
-
   /**
    * Returns the key and value for an arbitrary element of this map,
    * if it is non-empty, otherwise returns {@code null}.
@@ -194,4 +182,14 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
             new TypeParameter<K>() {}, keyCoder.getEncodedTypeDescriptor())
         .where(new TypeParameter<V>() {}, valueCoder.getEncodedTypeDescriptor());
   }
+
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
+  @Override
+  public Collection<String> getAllowedEncodings() {
+    return Collections.emptyList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index c92470a..dba2a8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -17,18 +17,15 @@
  */
 package org.apache.beam.sdk.coders;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -40,7 +37,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of the values being transcoded
  */
-public class NullableCoder<T> extends StandardCoder<T> {
+public class NullableCoder<T> extends CustomCoder<T> {
   public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
     if (valueCoder instanceof NullableCoder) {
       return (NullableCoder<T>) valueCoder;
@@ -48,14 +45,6 @@ public class NullableCoder<T> extends StandardCoder<T> {
     return new NullableCoder<>(valueCoder);
   }
 
-  @JsonCreator
-  public static NullableCoder<?> of(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> components) {
-    checkArgument(components.size() == 1, "Expecting 1 components, got %s", components.size());
-    return of(components.get(0));
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   private final Coder<T> valueCoder;
@@ -189,4 +178,14 @@ public class NullableCoder<T> extends StandardCoder<T> {
   public TypeDescriptor<T> getEncodedTypeDescriptor() {
     return valueCoder.getEncodedTypeDescriptor();
   }
+
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
+  @Override
+  public Collection<String> getAllowedEncodings() {
+    return Collections.emptyList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 1314a6c..d343af1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -26,7 +24,6 @@ import java.io.ObjectOutputStream;
 import java.io.ObjectStreamClass;
 import java.io.OutputStream;
 import java.io.Serializable;
-import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -66,18 +63,6 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
     return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
   }
 
-  @JsonCreator
-  @SuppressWarnings("unchecked")
-  public static SerializableCoder<?> of(@JsonProperty("type") String classType)
-      throws ClassNotFoundException {
-    Class<?> clazz = Class.forName(classType);
-    if (!Serializable.class.isAssignableFrom(clazz)) {
-      throw new ClassNotFoundException(
-          "Class " + classType + " does not implement Serializable");
-    }
-    return of((Class<? extends Serializable>) clazz);
-  }
-
   /**
    * A {@link CoderProvider} that constructs a {@link SerializableCoder}
    * for any class that implements serializable.
@@ -145,13 +130,6 @@ public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
         ObjectStreamClass.lookup(type).getSerialVersionUID());
   }
 
-  @Override
-  public CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    result.put("type", type.getName());
-    return result;
-  }
-
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index 6d8ad6a..a648767 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -21,38 +21,21 @@ package org.apache.beam.sdk.testing;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.DurationCoder;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypeParameter;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.joda.time.ReadableDuration;
 
 /**
  * A testing input that generates an unbounded {@link PCollection} of elements, advancing the
@@ -267,13 +250,6 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
   }
 
   /**
-   * Returns a coder suitable for encoding {@link TestStream.Event}.
-   */
-  public Coder<Event<T>> getEventCoder() {
-    return EventCoder.of(coder);
-  }
-
-  /**
    * Returns the sequence of {@link Event Events} in this {@link TestStream}.
    *
    * <p>For use by {@link PipelineRunner} authors.
@@ -281,96 +257,4 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
   public List<Event<T>> getEvents() {
     return events;
   }
-
-  /**
-   * A {@link Coder} that encodes and decodes {@link TestStream.Event Events}.
-   *
-   * @param <T> the type of elements in {@link ElementEvent ElementEvents} encoded and decoded by
-   *            this {@link EventCoder}
-   */
-  @VisibleForTesting
-  static final class EventCoder<T> extends StandardCoder<Event<T>> {
-    private static final Coder<ReadableDuration> DURATION_CODER = DurationCoder.of();
-    private static final Coder<Instant> INSTANT_CODER = InstantCoder.of();
-    private final Coder<T> valueCoder;
-    private final Coder<Iterable<TimestampedValue<T>>> elementCoder;
-
-    public static <T> EventCoder<T> of(Coder<T> valueCoder) {
-      return new EventCoder<>(valueCoder);
-    }
-
-    @JsonCreator
-    public static <T> EventCoder<T> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<? extends Coder<?>> components) {
-      checkArgument(
-          components.size() == 1,
-          "Was expecting exactly one component coder, got %s",
-          components.size());
-      return new EventCoder<>((Coder<T>) components.get(0));
-    }
-
-    private EventCoder(Coder<T> valueCoder) {
-      this.valueCoder = valueCoder;
-      this.elementCoder = IterableCoder.of(TimestampedValueCoder.of(valueCoder));
-    }
-
-    @Override
-    public void encode(
-        Event<T> value, OutputStream outStream, Context context)
-        throws IOException {
-      VarInt.encode(value.getType().ordinal(), outStream);
-      switch (value.getType()) {
-        case ELEMENT:
-          Iterable<TimestampedValue<T>> elems = ((ElementEvent<T>) value).getElements();
-          elementCoder.encode(elems, outStream, context);
-          break;
-        case WATERMARK:
-          Instant ts = ((WatermarkEvent<T>) value).getWatermark();
-          INSTANT_CODER.encode(ts, outStream, context);
-          break;
-        case PROCESSING_TIME:
-          Duration processingAdvance = ((ProcessingTimeEvent<T>) value).getProcessingTimeAdvance();
-          DURATION_CODER.encode(processingAdvance, outStream, context);
-          break;
-        default:
-          throw new AssertionError("Unreachable: Unsupported Event Type " + value.getType());
-      }
-    }
-
-    @Override
-    public Event<T> decode(
-        InputStream inStream, Context context) throws IOException {
-      EventType eventType = EventType.values()[VarInt.decodeInt(inStream)];
-      switch (eventType) {
-        case ELEMENT:
-          Iterable<TimestampedValue<T>> elements = elementCoder.decode(inStream, context);
-          return ElementEvent.add(elements);
-        case WATERMARK:
-          return WatermarkEvent.advanceTo(INSTANT_CODER.decode(inStream, context));
-        case PROCESSING_TIME:
-          return ProcessingTimeEvent.advanceBy(
-              DURATION_CODER.decode(inStream, context).toDuration());
-        default:
-          throw new AssertionError("Unreachable: Unsupported Event Type " + eventType);
-      }
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Collections.singletonList(valueCoder);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      elementCoder.verifyDeterministic();
-      DURATION_CODER.verifyDeterministic();
-      INSTANT_CODER.verifyDeterministic();
-    }
-
-    @Override
-    public TypeDescriptor<Event<T>> getEncodedTypeDescriptor() {
-      return new TypeDescriptor<Event<T>>() {}.where(
-          new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index abeeef0..5ffaef8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -41,7 +39,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DelegateCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -64,7 +61,6 @@ import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.util.NameUtils.NameOverride;
 import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -2278,7 +2274,7 @@ public class Combine {
       }
 
       private static class InputOrAccumCoder<InputT, AccumT>
-          extends StandardCoder<InputOrAccum<InputT, AccumT>> {
+          extends CustomCoder<InputOrAccum<InputT, AccumT>> {
 
         private final Coder<InputT> inputCoder;
         private final Coder<AccumT> accumCoder;
@@ -2288,14 +2284,6 @@ public class Combine {
           this.accumCoder = accumCoder;
         }
 
-        @JsonCreator
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        public static <InputT, AccumT> InputOrAccumCoder<InputT, AccumT> of(
-            @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-            List<Coder<?>> elementCoders) {
-          return new InputOrAccumCoder(elementCoders.get(0), elementCoders.get(1));
-        }
-
         @Override
         public void encode(
             InputOrAccum<InputT, AccumT> value, OutputStream outStream, Coder.Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index 79b2ab8..ca939c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
@@ -48,7 +46,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.CombineFnUtil;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
@@ -973,7 +970,7 @@ public class CombineFns {
     }
   }
 
-  private static class ComposedAccumulatorCoder extends StandardCoder<Object[]> {
+  private static class ComposedAccumulatorCoder extends CustomCoder<Object[]> {
     private List<Coder<Object>> coders;
     private int codersCount;
 
@@ -982,14 +979,6 @@ public class CombineFns {
       this.codersCount  = coders.size();
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @JsonCreator
-    public static ComposedAccumulatorCoder of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      return new ComposedAccumulatorCoder((List) components);
-    }
-
     @Override
     public void encode(Object[] value, OutputStream outStream, Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 9e0a011..83e178e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -17,11 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.join;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.sdk.util.Structs.addObject;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -35,10 +30,8 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.Reiterator;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
@@ -211,7 +204,7 @@ public class CoGbkResult {
   /**
    * A {@link Coder} for {@link CoGbkResult}s.
    */
-  public static class CoGbkResultCoder extends StandardCoder<CoGbkResult> {
+  public static class CoGbkResultCoder extends CustomCoder<CoGbkResult> {
 
     private final CoGbkResultSchema schema;
     private final UnionCoder unionCoder;
@@ -225,15 +218,6 @@ public class CoGbkResult {
       return new CoGbkResultCoder(schema, unionCoder);
     }
 
-    @JsonCreator
-    public static CoGbkResultCoder of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components,
-        @JsonProperty(PropertyNames.CO_GBK_RESULT_SCHEMA) CoGbkResultSchema schema) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-      return new CoGbkResultCoder(schema, (UnionCoder) components.get(0));
-    }
-
     private CoGbkResultCoder(
         CoGbkResultSchema tupleTags,
         UnionCoder unionCoder) {
@@ -247,13 +231,6 @@ public class CoGbkResult {
     }
 
     @Override
-    public CloudObject initializeCloudObject() {
-      CloudObject result = CloudObject.forClass(getClass());
-      addObject(result, PropertyNames.CO_GBK_RESULT_SCHEMA, schema.asCloudObject());
-      return result;
-    }
-
-    @Override
     @SuppressWarnings("unchecked")
     public void encode(
         CoGbkResult value,

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index 72ac6e8..2beff57 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -17,23 +17,20 @@
  */
 package org.apache.beam.sdk.transforms.join;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 
 /**
  * A UnionCoder encodes RawUnionValues.
  */
-public class UnionCoder extends StandardCoder<RawUnionValue> {
+public class UnionCoder extends CustomCoder<RawUnionValue> {
   // TODO: Think about how to integrate this with a schema object (i.e.
   // a tuple of tuple tags).
   /**
@@ -44,13 +41,6 @@ public class UnionCoder extends StandardCoder<RawUnionValue> {
     return new UnionCoder(elementCoders);
   }
 
-  @JsonCreator
-  public static UnionCoder jsonOf(
-      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-      List<Coder<?>> elements) {
-    return UnionCoder.of(elements);
-  }
-
   private int getIndexForEncoding(RawUnionValue union) {
     if (union == null) {
       throw new IllegalArgumentException("cannot encode a null tagged union");

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
index abca598..9902aa7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.sdk.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.MoreObjects;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,7 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -85,19 +81,11 @@ public class ValueWithRecordId<ValueT> {
    * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
    */
   public static class ValueWithRecordIdCoder<ValueT>
-      extends StandardCoder<ValueWithRecordId<ValueT>> {
+      extends CustomCoder<ValueWithRecordId<ValueT>> {
     public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
       return new ValueWithRecordIdCoder<>(valueCoder);
     }
 
-    @JsonCreator
-    public static <ValueT> ValueWithRecordIdCoder<ValueT> of(
-         @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<ValueT>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-      return of(components.get(0));
-    }
-
     protected ValueWithRecordIdCoder(Coder<ValueT> valueCoder) {
       this.valueCoder = valueCoder;
       this.idCoder = ByteArrayCoder.of();

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index c0c3df3..cde9a40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -17,11 +17,8 @@
  */
 package org.apache.beam.sdk.values;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -29,10 +26,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.joda.time.Instant;
 
 /**
@@ -88,11 +84,8 @@ public class TimestampedValue<V> {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@link Coder} for {@link TimestampedValue}.
-   */
-  public static class TimestampedValueCoder<T>
-      extends StandardCoder<TimestampedValue<T>> {
+  /** A {@link Coder} for {@link TimestampedValue}. */
+  public static class TimestampedValueCoder<T> extends CustomCoder<TimestampedValue<T>> {
 
     private final Coder<T> valueCoder;
 
@@ -100,15 +93,6 @@ public class TimestampedValue<V> {
       return new TimestampedValueCoder<>(valueCoder);
     }
 
-    @JsonCreator
-    public static TimestampedValueCoder<?> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<?>> components) {
-      checkArgument(components.size() == 1,
-                    "Expecting 1 component, got " + components.size());
-      return of(components.get(0));
-    }
-
     @SuppressWarnings("unchecked")
     TimestampedValueCoder(Coder<T> valueCoder) {
       this.valueCoder = checkNotNull(valueCoder);

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index c94190f..1fd356b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -17,10 +17,6 @@
  */
 package org.apache.beam.sdk.values;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
@@ -28,11 +24,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.PropertyNames;
 import org.joda.time.Instant;
 
 /**
@@ -61,7 +56,7 @@ public abstract class ValueInSingleWindow<T> {
   }
 
   /** A coder for {@link ValueInSingleWindow}. */
-  public static class Coder<T> extends StandardCoder<ValueInSingleWindow<T>> {
+  public static class Coder<T> extends CustomCoder<ValueInSingleWindow<T>> {
     private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
     private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
 
@@ -71,20 +66,6 @@ public abstract class ValueInSingleWindow<T> {
       return new Coder<>(valueCoder, windowCoder);
     }
 
-    @JsonCreator
-    public static <T> Coder<T> of(
-        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-            List<org.apache.beam.sdk.coders.Coder<?>> components) {
-      checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size());
-      @SuppressWarnings("unchecked")
-      org.apache.beam.sdk.coders.Coder<T> valueCoder =
-          (org.apache.beam.sdk.coders.Coder<T>) components.get(0);
-      @SuppressWarnings("unchecked")
-      org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder =
-          (org.apache.beam.sdk.coders.Coder<BoundedWindow>) components.get(1);
-      return new Coder<>(valueCoder, windowCoder);
-    }
-
     @SuppressWarnings({"unchecked", "rawtypes"})
     Coder(
         org.apache.beam.sdk.coders.Coder<T> valueCoder,

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 2cd047b..2a78823 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -150,10 +149,18 @@ public class AvroCoderTest {
   @Test
   public void testAvroCoderEncoding() throws Exception {
     AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
-    CloudObject encoding = coder.asCloudObject();
+    CoderProperties.coderSerializable(coder);
+    AvroCoder<Pojo> copy = SerializableUtils.clone(coder);
+
+    Pojo pojo = new Pojo("foo", 3);
+    Pojo equalPojo = new Pojo("foo", 3);
+    Pojo otherPojo = new Pojo("bar", -19);
+    CoderProperties.coderConsistentWithEquals(coder, pojo, equalPojo);
+    CoderProperties.coderConsistentWithEquals(copy, pojo, equalPojo);
+    CoderProperties.coderConsistentWithEquals(coder, pojo, otherPojo);
+    CoderProperties.coderConsistentWithEquals(copy, pojo, otherPojo);
+
 
-    Assert.assertThat(encoding.keySet(),
-        Matchers.containsInAnyOrder("@type", "type", "schema", "encoding_id"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 616e88e..6ec258e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -100,7 +100,7 @@ public class CoderRegistryTest {
   }
 
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
-  private class MyListCoder extends StandardCoder<List> {
+  private class MyListCoder extends CustomCoder<List> {
     @Override
     public void encode(List value, OutputStream outStream, Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index b95b76d..2ae7dd5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -109,7 +109,7 @@ public class DelegateCoderTest implements Serializable {
   private static final String TEST_ENCODING_ID = "test-encoding-id";
   private static final String TEST_ALLOWED_ENCODING = "test-allowed-encoding";
 
-  private static class TestAllowedEncodingsCoder extends StandardCoder<Integer> {
+  private static class TestAllowedEncodingsCoder extends CustomCoder<Integer> {
 
     @Override
     public void encode(Integer value, OutputStream outstream, Context context) {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index f7942d3..e4564df 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -26,7 +26,6 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.util.CloudObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -53,12 +52,6 @@ public class LengthPrefixCoderTest {
       "AA");
 
   @Test
-  public void testCloudObjectRepresentation() throws Exception {
-    CloudObject cloudObject = TEST_CODER.asCloudObject();
-    assertEquals("kind:length_prefix", cloudObject.getClassName());
-  }
-
-  @Test
   public void testCoderSerializable() throws Exception {
     CoderProperties.coderSerializable(TEST_CODER);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 052144e..b76c037 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -177,7 +177,7 @@ public class NullableCoderTest {
     assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(String.class)));
   }
 
-  private static class EntireStreamExpectingCoder extends StandardCoder<String> {
+  private static class EntireStreamExpectingCoder extends CustomCoder<String> {
     @Override
     public void encode(
         String value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index f8c0001..ec4b74c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -34,9 +34,8 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
@@ -123,13 +122,11 @@ public class SerializableCoderTest implements Serializable {
   public void testSerializableCoderConstruction() throws Exception {
     SerializableCoder<MyRecord> coder = SerializableCoder.of(MyRecord.class);
     assertEquals(coder.getRecordType(), MyRecord.class);
+    CoderProperties.coderSerializable(coder);
 
-    CloudObject encoding = coder.asCloudObject();
-    Assert.assertThat(encoding.getClassName(),
-        Matchers.containsString(SerializableCoder.class.getSimpleName()));
 
-    Coder<?> decoded = Serializer.deserialize(encoding, Coder.class);
-    Assert.assertThat(decoded, Matchers.instanceOf(SerializableCoder.class));
+    SerializableCoder<?> decoded = SerializableUtils.clone(coder);
+    assertThat(decoded.getRecordType(), Matchers.<Object>equalTo(MyRecord.class));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 5cb7634..bef6aa0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -313,28 +313,4 @@ public class TestStreamTest implements Serializable {
     thrown.expect(IllegalArgumentException.class);
     stream.advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
-
-  @Test
-  public void testEncodeDecode() throws Exception {
-    TestStream.Event<Integer> elems =
-        TestStream.ElementEvent.add(
-            TimestampedValue.of(1, new Instant()),
-            TimestampedValue.of(-10, new Instant()),
-            TimestampedValue.of(Integer.MAX_VALUE, new Instant()));
-    TestStream.Event<Integer> wm = TestStream.WatermarkEvent.advanceTo(new Instant(100));
-    TestStream.Event<Integer> procTime =
-        TestStream.ProcessingTimeEvent.advanceBy(Duration.millis(90548));
-
-    TestStream.EventCoder<Integer> coder = TestStream.EventCoder.of(VarIntCoder.of());
-
-    CoderProperties.coderSerializable(coder);
-    CoderProperties.coderDecodeEncodeEqual(coder, elems);
-    CoderProperties.coderDecodeEncodeEqual(coder, wm);
-    CoderProperties.coderDecodeEncodeEqual(coder, procTime);
-  }
-
-  @Test
-  public void testCoderIsSerializableWithWellKnownCoderType() {
-    CoderProperties.coderSerializable(TestStream.EventCoder.of(GlobalWindow.Coder.INSTANCE));
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index b107f3d..13c5f16 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -31,9 +31,9 @@ import java.util.List;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -345,7 +345,7 @@ public class  CombineFnsTest {
     }
   }
 
-  private static class UserStringCoder extends StandardCoder<UserString> {
+  private static class UserStringCoder extends CustomCoder<UserString> {
     public static UserStringCoder of() {
       return INSTANCE;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index e22f30e..2d5baf2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.util;
 
 import static org.junit.Assert.assertEquals;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,7 +27,7 @@ import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -87,7 +85,7 @@ public class SerializableUtilsTest {
   }
 
   /** A {@link Coder} that is not serializable by Java. */
-  private static class UnserializableCoderByJava extends StandardCoder<Object> {
+  private static class UnserializableCoderByJava extends CustomCoder<Object> {
     private final Object unserializableField = new Object();
 
     @Override
@@ -116,54 +114,4 @@ public class SerializableUtilsTest {
     expectedException.expectMessage("unable to serialize");
     SerializableUtils.ensureSerializable(new UnserializableCoderByJava());
   }
-
-  /** A {@link Coder} that is not serializable by Jackson. */
-  private static class UnserializableCoderByJackson extends StandardCoder<Object> {
-    private final SerializableByJava unserializableField;
-
-    public UnserializableCoderByJackson(SerializableByJava unserializableField) {
-      this.unserializableField = unserializableField;
-    }
-
-    @JsonCreator
-    public static UnserializableCoderByJackson of(
-        @JsonProperty("unserializableField") SerializableByJava unserializableField) {
-      return new UnserializableCoderByJackson(unserializableField);
-    }
-
-    @Override
-    public CloudObject initializeCloudObject() {
-      CloudObject result = CloudObject.forClass(getClass());
-      result.put("unserializableField", unserializableField);
-      return result;
-    }
-
-    @Override
-    public void encode(Object value, OutputStream outStream, Context context)
-        throws CoderException, IOException {
-    }
-
-    @Override
-    public Object decode(InputStream inStream, Context context)
-        throws CoderException, IOException {
-      return unserializableField;
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return ImmutableList.of();
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {}
-  }
-
-  @Test
-  public void testEnsureSerializableWithUnserializableCoderByJackson() throws Exception {
-    expectedException.expect(RuntimeException.class);
-    expectedException.expectMessage("Unable to deserialize Coder:");
-    SerializableUtils.ensureSerializable(
-        new UnserializableCoderByJackson(new SerializableByJava("TestData", 5)));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index ece3eca..a1841a1 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -46,8 +46,6 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderProvider;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -336,18 +334,6 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
     }
   }
 
-  @Override
-  public CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
-    Structs.addString(result, PROTO_MESSAGE_CLASS, protoMessageClass.getName());
-    List<CloudObject> extensionHostClassNames = Lists.newArrayList();
-    for (String className : getSortedExtensionClasses()) {
-      extensionHostClassNames.add(CloudObject.forString(className));
-    }
-    Structs.addList(result, PROTO_EXTENSION_HOSTS, extensionHostClassNames);
-    return result;
-  }
-
   /** Get the memoized {@link Parser}, possibly initializing it lazily. */
   private Parser<T> getParser() {
     if (memoizedParser == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 7fa3842..261d427 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -78,11 +78,6 @@
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-annotations</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cbcec7c0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index be4e71c..26b4b56 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -18,10 +18,6 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,9 +25,8 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.util.PropertyNames;
 
 
 /**
@@ -39,19 +34,11 @@ import org.apache.beam.sdk.util.PropertyNames;
  */
 @VisibleForTesting
 class ShardedKeyCoder<KeyT>
-    extends StandardCoder<ShardedKey<KeyT>> {
+    extends CustomCoder<ShardedKey<KeyT>> {
   public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
     return new ShardedKeyCoder<>(keyCoder);
   }
 
-  @JsonCreator
-  public static <KeyT> ShardedKeyCoder<KeyT> of(
-       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-           List<Coder<KeyT>> components) {
-    checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-    return of(components.get(0));
-  }
-
   protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
     this.keyCoder = keyCoder;
     this.shardNumberCoder = VarIntCoder.of();


Mime
View raw message