beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [3/6] beam git commit: Reparent many Coders to Atomic or StructuredCoder
Date Fri, 05 May 2017 20:35:23 GMT
Reparent many Coders to Atomic or StructuredCoder

These coders do not take configuration, or take configuration only in
terms of other Coders, and are appropriate to reparent.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/987c2cbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/987c2cbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/987c2cbc

Branch: refs/heads/master
Commit: 987c2cbca433f3e0ff12a637f2b0474e772c6beb
Parents: 63258c6
Author: Thomas Groh <tgroh@google.com>
Authored: Fri May 5 10:13:05 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri May 5 13:30:51 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ApexStreamTuple.java |  5 ++-
 .../UnboundedReadFromBoundedSource.java         |  4 +-
 .../runners/core/construction/CodersTest.java   | 14 +------
 .../core/construction/PCollectionsTest.java     |  3 +-
 .../core/ElementAndRestrictionCoder.java        | 17 ++++++++-
 .../beam/runners/core/KeyedWorkItemCoder.java   |  4 +-
 .../beam/runners/core/TimerInternals.java       |  6 +--
 .../direct/CloningBundleFactoryTest.java        | 10 ++---
 .../beam/runners/direct/DirectRunnerTest.java   |  5 +--
 .../UnboundedReadEvaluatorFactoryTest.java      |  4 +-
 .../streaming/SingletonKeyedWorkItemCoder.java  |  4 +-
 .../runners/dataflow/BatchViewOverrides.java    |  8 ++--
 .../runners/dataflow/internal/IsmFormat.java    | 40 ++++++++++++++++----
 .../runners/dataflow/util/RandomAccessData.java |  4 +-
 .../org/apache/beam/sdk/coders/AvroCoder.java   | 19 ++++++++++
 .../apache/beam/sdk/coders/BigDecimalCoder.java |  2 +-
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  2 +-
 .../beam/sdk/coders/BigEndianLongCoder.java     |  4 +-
 .../apache/beam/sdk/coders/BigIntegerCoder.java |  4 +-
 .../org/apache/beam/sdk/coders/BitSetCoder.java |  4 +-
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |  6 +--
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  2 +-
 .../apache/beam/sdk/coders/CoderFactories.java  |  9 +++--
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  2 +-
 .../apache/beam/sdk/coders/DurationCoder.java   |  2 +-
 .../apache/beam/sdk/coders/InstantCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |  4 +-
 .../org/apache/beam/sdk/coders/ListCoder.java   |  3 +-
 .../org/apache/beam/sdk/coders/MapCoder.java    |  2 +-
 .../apache/beam/sdk/coders/NullableCoder.java   |  6 +--
 .../apache/beam/sdk/coders/StringUtf8Coder.java |  2 +-
 .../beam/sdk/coders/TextualIntegerCoder.java    |  2 +-
 .../org/apache/beam/sdk/coders/VarIntCoder.java |  2 +-
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  4 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  4 +-
 .../sdk/transforms/ApproximateQuantiles.java    | 26 ++++++++++---
 .../org/apache/beam/sdk/transforms/Combine.java | 12 ++++--
 .../apache/beam/sdk/transforms/CombineFns.java  |  4 +-
 .../org/apache/beam/sdk/transforms/Count.java   |  4 +-
 .../org/apache/beam/sdk/transforms/Mean.java    |  7 ++--
 .../org/apache/beam/sdk/transforms/Top.java     | 23 ++++++++++-
 .../beam/sdk/transforms/join/CoGbkResult.java   |  2 +-
 .../beam/sdk/transforms/join/UnionCoder.java    |  7 ++--
 .../beam/sdk/transforms/windowing/PaneInfo.java | 10 ++++-
 .../org/apache/beam/sdk/util/BitSetCoder.java   |  9 +++--
 .../org/apache/beam/sdk/util/WindowedValue.java |  7 ++--
 .../beam/sdk/values/TimestampedValue.java       | 13 +++++--
 .../beam/sdk/values/ValueInSingleWindow.java    |  4 +-
 .../beam/sdk/values/ValueWithRecordId.java      |  4 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |  4 +-
 .../beam/sdk/coders/DelegateCoderTest.java      | 25 ------------
 .../beam/sdk/coders/NullableCoderTest.java      |  2 +-
 .../beam/sdk/testing/CoderPropertiesTest.java   | 37 ++++++++++++++++--
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +-
 .../sdk/testing/SerializableMatchersTest.java   |  4 +-
 .../beam/sdk/testing/WindowSupplierTest.java    |  6 +--
 .../beam/sdk/transforms/CombineFnsTest.java     |  4 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  6 +--
 .../apache/beam/sdk/transforms/CreateTest.java  |  6 +--
 .../beam/sdk/transforms/GroupByKeyTest.java     |  4 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +--
 .../apache/beam/sdk/transforms/ViewTest.java    |  4 +-
 .../transforms/reflect/DoFnInvokersTest.java    |  6 +--
 .../apache/beam/sdk/util/CoderUtilsTest.java    |  4 +-
 .../beam/sdk/util/SerializableUtilsTest.java    |  4 +-
 .../extensions/protobuf/ByteStringCoder.java    |  4 +-
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    | 10 ++---
 .../io/gcp/bigquery/TableDestinationCoder.java  |  4 +-
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |  4 +-
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  |  4 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 11 +++++-
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  4 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    | 12 ++++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  4 +-
 .../beam/sdk/io/hadoop/WritableCoder.java       | 18 +++++++++
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  4 +-
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |  4 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  6 +--
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  4 +-
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +-
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 18 +++++++++
 .../apache/beam/sdk/io/xml/JAXBCoderTest.java   |  4 +-
 82 files changed, 369 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
index 4ce351b..4aa6ee8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStreamTuple.java
@@ -31,7 +31,7 @@ import java.util.Objects;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 
 /**
  * The common interface for all objects transmitted through streams.
@@ -149,7 +149,7 @@ public interface ApexStreamTuple<T> {
   /**
    * Coder for {@link ApexStreamTuple}.
    */
-  class ApexStreamTupleCoder<T> extends CustomCoder<ApexStreamTuple<T>> {
+  class ApexStreamTupleCoder<T> extends StructuredCoder<ApexStreamTuple<T>> {
     private static final long serialVersionUID = 1L;
     final Coder<T> valueCoder;
 
@@ -194,6 +194,7 @@ public interface ApexStreamTuple<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
+          this,
           this.getClass().getSimpleName() + " requires a deterministic valueCoder",
           valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 0ea13b8..1424b8b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -35,10 +35,10 @@ import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.Read;
@@ -203,7 +203,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @VisibleForTesting
-    static class CheckpointCoder<T> extends CustomCoder<Checkpoint<T>> {
+    static class CheckpointCoder<T> extends StructuredCoder<Checkpoint<T>> {
 
       // The coder for a list of residual elements and their timestamps
       private final Coder<List<TimestampedValue<T>>> elemsCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index 32a78fa..765723c 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -30,11 +30,11 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
@@ -149,7 +149,7 @@ public class CodersTest {
 
     static class Record implements Serializable {}
 
-    private static class RecordCoder extends CustomCoder<Record> {
+    private static class RecordCoder extends AtomicCoder<Record> {
       @Override
       public void encode(Record value, OutputStream outStream, Context context)
           throws CoderException, IOException {}
@@ -159,16 +159,6 @@ public class CodersTest {
           throws CoderException, IOException {
         return new Record();
       }
-
-      @Override
-      public boolean equals(Object other) {
-        return other != null && getClass().equals(other.getClass());
-      }
-
-      @Override
-      public int hashCode() {
-        return getClass().hashCode();
-      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index c177c58..2c45cbd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -158,7 +159,7 @@ public class PCollectionsTest {
 
     @Override
     public Coder<BoundedWindow> windowCoder() {
-      return new CustomCoder<BoundedWindow>() {
+      return new AtomicCoder<BoundedWindow>() {
         @Override public void verifyDeterministic() {}
 
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 64c1e14..83c4e62 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.runners.core;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 
 /** A {@link Coder} for {@link ElementAndRestriction}. */
 @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
 public class ElementAndRestrictionCoder<ElementT, RestrictionT>
-    extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+    extends StructuredCoder<ElementAndRestriction<ElementT, RestrictionT>> {
   private final Coder<ElementT> elementCoder;
   private final Coder<RestrictionT> restrictionCoder;
 
@@ -65,6 +67,17 @@ public class ElementAndRestrictionCoder<ElementT, RestrictionT>
     return ElementAndRestriction.of(key, value);
   }
 
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(elementCoder, restrictionCoder);
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    elementCoder.verifyDeterministic();
+    restrictionCoder.verifyDeterministic();
+  }
+
   public Coder<ElementT> getElementCoder() {
     return elementCoder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index fddf7fa..e1872b5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -26,8 +26,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 /**
  * A {@link Coder} for {@link KeyedWorkItem KeyedWorkItems}.
  */
-public class KeyedWorkItemCoder<K, ElemT> extends CustomCoder<KeyedWorkItem<K, ElemT>> {
+public class KeyedWorkItemCoder<K, ElemT> extends StructuredCoder<KeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 21fe430..888c11f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -27,9 +27,9 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -224,7 +224,7 @@ public interface TimerInternals {
   /**
    * A {@link Coder} for {@link TimerData}.
    */
-  class TimerDataCoder extends CustomCoder<TimerData> {
+  class TimerDataCoder extends StructuredCoder<TimerData> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();
     private final Coder<? extends BoundedWindow> windowCoder;
@@ -266,7 +266,7 @@ public interface TimerInternals {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("window coder must be deterministic", windowCoder);
+      verifyDeterministic(this, "window coder must be deterministic", windowCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 7d037d1..33d171e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -173,7 +173,7 @@ public class CloningBundleFactoryTest {
   }
 
   static class Record {}
-  static class RecordNoEncodeCoder extends CustomCoder<Record> {
+  static class RecordNoEncodeCoder extends AtomicCoder<Record> {
 
     @Override
     public void encode(
@@ -192,7 +192,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  static class RecordNoDecodeCoder extends CustomCoder<Record> {
+  static class RecordNoDecodeCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -208,7 +208,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  private static class RecordStructuralValueCoder extends CustomCoder<Record> {
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -240,7 +240,7 @@ public class CloningBundleFactoryTest {
   }
 
   private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends CustomCoder<Record> {
+      extends AtomicCoder<Record> {
     @Override
     public void encode(
         Record value,

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 428c6fc..0fe9585 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -44,9 +44,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -523,8 +523,7 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-  private static class LongNoDecodeCoder extends CustomCoder<Long> {
-
+  private static class LongNoDecodeCoder extends AtomicCoder<Long> {
     @Override
     public void encode(
         Long value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index ceb078b..b9ba7f4 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -46,10 +46,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
@@ -586,7 +586,7 @@ public class UnboundedReadEvaluatorFactoryTest {
       return finalized;
     }
 
-    public static class Coder extends CustomCoder<TestCheckpointMark> {
+    public static class Coder extends AtomicCoder<TestCheckpointMark> {
       @Override
       public void encode(
           TestCheckpointMark value,

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
index c73700f..f218693 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -34,7 +34,7 @@ import org.apache.beam.sdk.util.WindowedValue;
  * Singleton keyed work item coder.
  */
 public class SingletonKeyedWorkItemCoder<K, ElemT>
-    extends CustomCoder<SingletonKeyedWorkItem<K, ElemT>> {
+    extends StructuredCoder<SingletonKeyedWorkItem<K, ElemT>> {
   /**
    * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
    * coder.

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
index ef2bfed..ecd0365 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java
@@ -50,11 +50,11 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -1335,7 +1335,7 @@ class BatchViewOverrides {
    * A {@link Coder} for {@link TransformedMap}s.
    */
   static class TransformedMapCoder<K, V1, V2>
-      extends CustomCoder<TransformedMap<K, V1, V2>> {
+      extends StructuredCoder<TransformedMap<K, V1, V2>> {
     private final Coder<Function<V1, V2>> transformCoder;
     private final Coder<Map<K, V1>> originalMapCoder;
 
@@ -1373,8 +1373,8 @@ class BatchViewOverrides {
     @Override
     public void verifyDeterministic()
         throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-      verifyDeterministic("Expected transform coder to be deterministic.", transformCoder);
-      verifyDeterministic("Expected map coder to be deterministic.", originalMapCoder);
+      verifyDeterministic(this, "Expected transform coder to be deterministic.", transformCoder);
+      verifyDeterministic(this, "Expected map coder to be deterministic.", originalMapCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index fbfe49a..aed514a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -32,14 +32,17 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.util.VarInt;
@@ -356,8 +359,9 @@ public class IsmFormat {
 
     @Override
     public void verifyDeterministic() throws Coder.NonDeterministicException {
-      verifyDeterministic("Key component coders expected to be deterministic.", keyComponentCoders);
-      verifyDeterministic("Value coder expected to be deterministic.", valueCoder);
+      verifyDeterministic(
+          this, "Key component coders expected to be deterministic.", keyComponentCoders);
+      verifyDeterministic(this, "Value coder expected to be deterministic.", valueCoder);
     }
 
     @Override
@@ -393,6 +397,28 @@ public class IsmFormat {
       }
       return super.structuralValue(record);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof IsmRecordCoder)) {
+        return false;
+      }
+      IsmRecordCoder<?> that = (IsmRecordCoder<?>) other;
+      return Objects.equals(this.numberOfShardKeyCoders, that.numberOfShardKeyCoders)
+          && Objects.equals(
+              this.numberOfMetadataShardKeyCoders, that.numberOfMetadataShardKeyCoders)
+          && Objects.equals(this.keyComponentCoders, that.keyComponentCoders)
+          && Objects.equals(this.valueCoder, that.valueCoder);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          numberOfShardKeyCoders, numberOfMetadataShardKeyCoders, keyComponentCoders, valueCoder);
+    }
   }
 
   /**
@@ -450,7 +476,7 @@ public class IsmFormat {
    * A coder for metadata key component. Can be used to wrap key component coder allowing for
    * the metadata key component to be used as a place holder instead of an actual key.
    */
-  public static class MetadataKeyCoder<K> extends CustomCoder<K> {
+  public static class MetadataKeyCoder<K> extends StructuredCoder<K> {
     public static <K> MetadataKeyCoder<K> of(Coder<K> keyCoder) {
       checkNotNull(keyCoder);
       return new MetadataKeyCoder<>(keyCoder);
@@ -497,7 +523,7 @@ public class IsmFormat {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic("Expected key coder to be deterministic", keyCoder);
+      verifyDeterministic(this, "Expected key coder to be deterministic", keyCoder);
     }
   }
 
@@ -584,7 +610,7 @@ public class IsmFormat {
    *   <li>indexOffset (variable length long encoding)</li>
    * </ul>
    */
-  public static class IsmShardCoder extends CustomCoder<IsmShard> {
+  public static class IsmShardCoder extends AtomicCoder<IsmShard> {
     private static final IsmShardCoder INSTANCE = new IsmShardCoder();
 
     /** Returns an IsmShardCoder. */
@@ -649,7 +675,7 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link KeyPrefix}. */
-  public static final class KeyPrefixCoder extends CustomCoder<KeyPrefix> {
+  public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
     private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
 
     public static KeyPrefixCoder of() {
@@ -721,7 +747,7 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link Footer}. */
-  public static final class FooterCoder extends CustomCoder<Footer> {
+  public static final class FooterCoder extends AtomicCoder<Footer> {
     private static final FooterCoder INSTANCE = new FooterCoder();
 
     public static FooterCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 66548e2..4e94515 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -30,10 +30,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Comparator;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 
 /**
@@ -55,7 +55,7 @@ public class RandomAccessData {
    *
    * <p>This coder does not support encoding positive infinity.
    */
-  public static class RandomAccessDataCoder extends CustomCoder<RandomAccessData> {
+  public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
     private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
 
     public static RandomAccessDataCoder of() {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index 1e01f1a..2aa2b44 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -29,6 +29,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -681,4 +682,22 @@ public class AvroCoder<T> extends CustomCoder<T> {
       throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz);
     }
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (!(other instanceof AvroCoder)) {
+      return false;
+    }
+    AvroCoder<?> that = (AvroCoder<?>) other;
+    return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
+        && Objects.equals(this.typeDescriptor, that.typeDescriptor);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(schemaSupplier.get(), typeDescriptor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index d628203..aadf085 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -32,7 +32,7 @@ import java.math.MathContext;
  * {@link BigInteger}, when scaled (with unlimited precision, aka {@link MathContext#UNLIMITED}),
  * yields the expected {@link BigDecimal}.
  */
-public class BigDecimalCoder extends CustomCoder<BigDecimal> {
+public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
 
   public static BigDecimalCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index 81c5e94..c3c7a96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
  */
-public class BigEndianIntegerCoder extends CustomCoder<Integer> {
+public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
 
   public static BigEndianIntegerCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 173e910..5ef4878 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
  */
-public class BigEndianLongCoder extends CustomCoder<Long> {
+public class BigEndianLongCoder extends AtomicCoder<Long> {
 
-  @JsonCreator
   public static BigEndianLongCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index a739da7..6d14d17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -28,7 +28,7 @@ import java.math.BigInteger;
  * A {@link BigIntegerCoder} encodes a {@link BigInteger} as a byte array containing the big endian
  * two's-complement representation, encoded via {@link ByteArrayCoder}.
  */
-public class BigIntegerCoder extends CustomCoder<BigInteger> {
+public class BigIntegerCoder extends AtomicCoder<BigInteger> {
 
   public static BigIntegerCoder of() {
     return INSTANCE;
@@ -55,7 +55,7 @@ public class BigIntegerCoder extends CustomCoder<BigInteger> {
   }
 
   @Override
-  public void verifyDeterministic() throws NonDeterministicException {
+  public void verifyDeterministic() {
     BYTE_ARRAY_CODER.verifyDeterministic();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
index 5a4db24..f49776b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BitSetCoder.java
@@ -25,7 +25,7 @@ import java.util.BitSet;
 /**
  * Coder for {@link BitSet}.
  */
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
   private static final BitSetCoder INSTANCE = new BitSetCoder();
   private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
 
@@ -53,7 +53,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+        this, "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index cba8d49..28cb627 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,9 +39,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * encoded via a {@link VarIntCoder}.</li>
  * </ul>
  */
-public class ByteArrayCoder extends StructuredCoder<byte[]> {
+public class ByteArrayCoder extends AtomicCoder<byte[]> {
 
-  @JsonCreator
   public static ByteArrayCoder of() {
     return INSTANCE;
   }
@@ -117,7 +115,7 @@ public class ByteArrayCoder extends StructuredCoder<byte[]> {
   }
 
   @Override
-  public void verifyDeterministic() throws NonDeterministicException {}
+  public void verifyDeterministic() {}
 
   /**
    * {@inheritDoc}

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 1a1be64..6e4318e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
  */
-public class ByteCoder extends CustomCoder<Byte> {
+public class ByteCoder extends AtomicCoder<Byte> {
 
   public static ByteCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
index 2a1d792..4f05c95 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderFactories.java
@@ -239,11 +239,12 @@ public final class CoderFactories {
     }
 
     /**
-     * If {@code coderType} is a subclass of {@link Coder} for a specific
-     * type {@code T}, returns {@code T.class}. Otherwise, raises IllegalArgumentException.
+     * If {@code coderType} is a subclass of {@link Coder} for a specific type {@code T}, returns
+     * {@code T.class}. Otherwise, raises IllegalArgumentException.
      */
-    private <T> TypeDescriptor<T> getCodedType(TypeDescriptor<? extends Coder> coderType) {
-      TypeDescriptor<?> coderSupertype = coderType.getSupertype(Coder.class);
+    private <T, CoderT extends Coder> TypeDescriptor<T> getCodedType(
+        TypeDescriptor<CoderT> coderType) {
+      TypeDescriptor<? super CoderT> coderSupertype = coderType.getSupertype(Coder.class);
       ParameterizedType coderIface = (ParameterizedType) coderSupertype.getType();
       @SuppressWarnings("unchecked")
       TypeDescriptor<T> token =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 06e7dae..12bc5e8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
  */
-public class DoubleCoder extends CustomCoder<Double> {
+public class DoubleCoder extends AtomicCoder<Double> {
 
   public static DoubleCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index 10a83ef..7b49d1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.ReadableDuration;
  * A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
  * {@link VarLongCoder}.
  */
-public class DurationCoder extends CustomCoder<ReadableDuration> {
+public class DurationCoder extends AtomicCoder<ReadableDuration> {
 
   public static DurationCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index cfd1979..56ed12b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -29,7 +29,7 @@ import org.joda.time.Instant;
  * A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
  * shifted such that lexicographic ordering of the bytes corresponds to chronological order.
  */
-public class InstantCoder extends CustomCoder<Instant> {
+public class InstantCoder extends AtomicCoder<Instant> {
   public static InstantCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 8a689f7..35b7449 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -89,8 +89,8 @@ public class KvCoder<K, V> extends StructuredCoder<KV<K, V>> {
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Key coder must be deterministic", getKeyCoder());
-    verifyDeterministic("Value coder must be deterministic", getValueCoder());
+    verifyDeterministic(this, "Key coder must be deterministic", getKeyCoder());
+    verifyDeterministic(this, "Value coder must be deterministic", getValueCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 32467d2..70bbf93 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -58,8 +58,7 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "ListCoder.elemCoder must be deterministic", getElemCoder());
+    verifyDeterministic(this, "ListCoder.elemCoder must be deterministic", getElemCoder());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index e2c4e28..da2bf50 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -40,7 +40,7 @@ import org.apache.beam.sdk.values.TypeParameter;
  * @param <K> the type of the keys of the KVs being transcoded
  * @param <V> the type of the values of the KVs being transcoded
  */
-public class MapCoder<K, V> extends CustomCoder<Map<K, V>> {
+public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
   /**
    * Produces a MapCoder with the given keyCoder and valueCoder.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 747d91c..d1eea9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of the values being transcoded
  */
-public class NullableCoder<T> extends CustomCoder<T> {
+public class NullableCoder<T> extends StructuredCoder<T> {
   public static <T> NullableCoder<T> of(Coder<T> valueCoder) {
     if (valueCoder instanceof NullableCoder) {
       return (NullableCoder<T>) valueCoder;
@@ -93,11 +93,11 @@ public class NullableCoder<T> extends CustomCoder<T> {
   /**
    * {@code NullableCoder} is deterministic if the nested {@code Coder} is.
    *
-   * {@inheritDoc}
+   * <p>{@inheritDoc}
    */
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic("Value coder must be deterministic", valueCoder);
+    verifyDeterministic(this, "Value coder must be deterministic", valueCoder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index f0a0969..42931ca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -38,7 +38,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * If in a nested context, prefixes the string with an integer length field,
  * encoded via a {@link VarIntCoder}.
  */
-public class StringUtf8Coder extends CustomCoder<String> {
+public class StringUtf8Coder extends AtomicCoder<String> {
 
   public static StringUtf8Coder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 91a46ea..9743c4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
  * their textual, decimal, representation.
  */
-public class TextualIntegerCoder extends CustomCoder<Integer> {
+public class TextualIntegerCoder extends AtomicCoder<Integer> {
 
   public static TextualIntegerCoder of() {
     return new TextualIntegerCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index fcc0335..30f9c09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
  * integers that are known to often be large or negative.
  */
-public class VarIntCoder extends CustomCoder<Integer> {
+public class VarIntCoder extends AtomicCoder<Integer> {
 
   public static VarIntCoder of() {
     return INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index a65fa5e..829bd20 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -25,9 +24,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
  */
-public class VoidCoder extends CustomCoder<Void> {
+public class VoidCoder extends AtomicCoder<Void> {
 
-  @JsonCreator
   public static VoidCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 0daf5dc..20fab9b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -43,8 +43,8 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
@@ -938,7 +938,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * A coder for {@link FileResult} objects.
    */
-  public static final class FileResultCoder extends CustomCoder<FileResult> {
+  public static final class FileResultCoder extends AtomicCoder<FileResult> {
     private static final FileResultCoder INSTANCE = new FileResultCoder();
     private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
index 5432f09..b05f223 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ApproximateQuantiles.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -762,13 +763,28 @@ public class ApproximateQuantiles {
     }
 
     @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof QuantileStateCoder)) {
+        return false;
+      }
+      QuantileStateCoder<?, ?> that = (QuantileStateCoder<?, ?>) other;
+      return Objects.equals(this.elementCoder, that.elementCoder)
+          && Objects.equals(this.compareFn, that.compareFn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(elementCoder, compareFn);
+    }
+
+    @Override
     public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(this, "QuantileState.ElementCoder must be deterministic", elementCoder);
       verifyDeterministic(
-          "QuantileState.ElementCoder must be deterministic",
-          elementCoder);
-      verifyDeterministic(
-          "QuantileState.ElementListCoder must be deterministic",
-          elementListCoder);
+          this, "QuantileState.ElementListCoder must be deterministic", elementListCoder);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 666db3b..b9cdbd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,10 +36,10 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DelegateCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -522,7 +523,7 @@ public class Combine {
   /**
    * A {@link Coder} for a {@link Holder}.
    */
-  private static class HolderCoder<V> extends CustomCoder<Holder<V>> {
+  private static class HolderCoder<V> extends StructuredCoder<Holder<V>> {
 
     private Coder<V> valueCoder;
 
@@ -552,6 +553,11 @@ public class Combine {
     }
 
     @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.singletonList(valueCoder);
+    }
+
+    @Override
     public void verifyDeterministic() throws NonDeterministicException {
       valueCoder.verifyDeterministic();
     }
@@ -1954,7 +1960,7 @@ public class Combine {
       }
 
       private static class InputOrAccumCoder<InputT, AccumT>
-          extends CustomCoder<InputOrAccum<InputT, AccumT>> {
+          extends StructuredCoder<InputOrAccum<InputT, AccumT>> {
 
         private final Coder<InputT> inputCoder;
         private final Coder<AccumT> accumCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index cc02dcf..d4c97bc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
 import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -524,7 +524,7 @@ public class CombineFns {
     }
   }
 
-  private static class ComposedAccumulatorCoder extends CustomCoder<Object[]> {
+  private static class ComposedAccumulatorCoder extends StructuredCoder<Object[]> {
     private List<Coder<Object>> coders;
     private int codersCount;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index 78a6cd1..753e14c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -23,10 +23,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import java.util.Iterator;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.util.VarInt;
@@ -167,7 +167,7 @@ public class Count {
     @Override
     public Coder<long[]> getAccumulatorCoder(CoderRegistry registry,
                                              Coder<T> inputCoder) {
-      return new CustomCoder<long[]>() {
+      return new AtomicCoder<long[]>() {
         @Override
         public void encode(long[] value, OutputStream outStream, Context context)
             throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index a6808cf..a309954 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
 
@@ -180,9 +180,8 @@ public class Mean {
     }
   }
 
-  static class CountSumCoder<NumT extends Number>
-  extends CustomCoder<CountSum<NumT>> {
-     private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
+  static class CountSumCoder<NumT extends Number> extends AtomicCoder<CountSum<NumT>> {
+    private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
 
      @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index e42c0b2..9d5db74 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -551,8 +552,7 @@ public class Top {
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(
-          "HeapCoder requires a deterministic list coder", listCoder);
+      verifyDeterministic(this, "HeapCoder requires a deterministic list coder", listCoder);
     }
 
     @Override
@@ -568,5 +568,24 @@ public class Top {
             throws Exception {
       listCoder.registerByteSizeObserver(value.asList(), observer, context);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == this) {
+        return true;
+      }
+      if (!(other instanceof BoundedHeapCoder)) {
+        return false;
+      }
+      BoundedHeapCoder<?, ?> that = (BoundedHeapCoder<?, ?>) other;
+      return Objects.equals(this.compareFn, that.compareFn)
+          && Objects.equals(this.listCoder, that.listCoder)
+          && this.maximumSize == that.maximumSize;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(compareFn, listCoder, maximumSize);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 02e1185..e9a3571 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -300,7 +300,7 @@ public class CoGbkResult {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "CoGbkResult requires the union coder to be deterministic", unionCoder);
+          this, "CoGbkResult requires the union coder to be deterministic", unionCoder);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
index f411cd1..4a2a286 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java
@@ -23,14 +23,14 @@ import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 
 /**
  * A UnionCoder encodes RawUnionValues.
  */
-public class UnionCoder extends CustomCoder<RawUnionValue> {
+public class UnionCoder extends StructuredCoder<RawUnionValue> {
   // TODO: Think about how to integrate this with a schema object (i.e.
   // a tuple of tuple tags).
   /**
@@ -134,7 +134,6 @@ public class UnionCoder extends CustomCoder<RawUnionValue> {
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "UnionCoder is only deterministic if all element coders are",
-        elementCoders);
+        this, "UnionCoder is only deterministic if all element coders are", elementCoders);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index faf3ca9..79ce2f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -307,7 +307,7 @@ public final class PaneInfo {
   /**
    * A Coder for encoding PaneInfo instances.
    */
-  public static class PaneInfoCoder extends CustomCoder<PaneInfo> {
+  public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
     private enum Encoding {
       FIRST,
       ONE_INDEX,
@@ -340,6 +340,12 @@ public final class PaneInfo {
 
     public static final PaneInfoCoder INSTANCE = new PaneInfoCoder();
 
+    public static PaneInfoCoder of() {
+      return INSTANCE;
+    }
+
+    private PaneInfoCoder() {}
+
     @Override
     public void encode(PaneInfo value, final OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index b0e9b5c..b646bf6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -21,15 +21,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.BitSet;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
  * Coder for the BitSet used to track child-trigger finished states.
+ *
+ * @deprecated use {@link org.apache.beam.sdk.coders.BitSetCoder} instead
  */
 @Deprecated
-public class BitSetCoder extends CustomCoder<BitSet> {
+public class BitSetCoder extends AtomicCoder<BitSet> {
 
   private static final BitSetCoder INSTANCE = new BitSetCoder();
   private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
@@ -54,8 +56,7 @@ public class BitSetCoder extends CustomCoder<BitSet> {
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
-    verifyDeterministic(
-        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
+    BYTE_ARRAY_CODER.verifyDeterministic();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 13e499d..1e72550 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -663,11 +663,9 @@ public abstract class WindowedValue<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "FullWindowedValueCoder requires a deterministic valueCoder",
-          valueCoder);
+          this, "FullWindowedValueCoder requires a deterministic valueCoder", valueCoder);
       verifyDeterministic(
-          "FullWindowedValueCoder requires a deterministic windowCoder",
-          windowCoder);
+          this, "FullWindowedValueCoder requires a deterministic windowCoder", windowCoder);
     }
 
     @Override
@@ -728,6 +726,7 @@ public abstract class WindowedValue<T> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
+          this,
           "ValueOnlyWindowedValueCoder requires a deterministic valueCoder",
           valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index cde9a40..a9f3929 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -23,11 +23,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
 
@@ -85,7 +86,7 @@ public class TimestampedValue<V> {
   /////////////////////////////////////////////////////////////////////////////
 
   /** A {@link Coder} for {@link TimestampedValue}. */
-  public static class TimestampedValueCoder<T> extends CustomCoder<TimestampedValue<T>> {
+  public static class TimestampedValueCoder<T> extends StructuredCoder<TimestampedValue<T>> {
 
     private final Coder<T> valueCoder;
 
@@ -119,8 +120,7 @@ public class TimestampedValue<V> {
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       verifyDeterministic(
-          "TimestampedValueCoder requires a deterministic valueCoder",
-          valueCoder);
+          this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder);
     }
 
     @Override
@@ -141,6 +141,11 @@ public class TimestampedValue<V> {
       return new TypeDescriptor<TimestampedValue<T>>() {}.where(
           new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
     }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return Collections.singletonList(valueCoder);
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
index 1fd356b..3ecbaa2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java
@@ -24,8 +24,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.joda.time.Instant;
@@ -56,7 +56,7 @@ public abstract class ValueInSingleWindow<T> {
   }
 
   /** A coder for {@link ValueInSingleWindow}. */
-  public static class Coder<T> extends CustomCoder<ValueInSingleWindow<T>> {
+  public static class Coder<T> extends StructuredCoder<ValueInSingleWindow<T>> {
     private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
     private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
index 0d92f40..3f057e1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java
@@ -27,7 +27,7 @@ import java.util.Objects;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -85,7 +85,7 @@ public class ValueWithRecordId<ValueT> {
    * A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}.
    */
   public static class ValueWithRecordIdCoder<ValueT>
-      extends CustomCoder<ValueWithRecordId<ValueT>> {
+      extends StructuredCoder<ValueWithRecordId<ValueT>> {
     public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
       return new ValueWithRecordIdCoder<>(valueCoder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index fe21a1c..5107355 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -99,7 +99,7 @@ public class CoderRegistryTest {
   }
 
   @SuppressWarnings("rawtypes") // this class exists to fail a test because of its rawtypes
-  private class MyListCoder extends CustomCoder<List> {
+  private class MyListCoder extends AtomicCoder<List> {
     @Override
     public void encode(List value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -441,7 +441,7 @@ public class CoderRegistryTest {
 
   private static class MyValue { }
 
-  private static class MyValueCoder extends CustomCoder<MyValue> {
+  private static class MyValueCoder extends AtomicCoder<MyValue> {
 
     private static final MyValueCoder INSTANCE = new MyValueCoder();
     private static final TypeDescriptor<MyValue> TYPE_DESCRIPTOR = TypeDescriptor.of(MyValue.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index 5ff272f..8aeb22a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -24,8 +24,6 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -108,29 +106,6 @@ public class DelegateCoderTest implements Serializable {
   private static final String TEST_ENCODING_ID = "test-encoding-id";
   private static final String TEST_ALLOWED_ENCODING = "test-allowed-encoding";
 
-  private static class TestAllowedEncodingsCoder extends CustomCoder<Integer> {
-
-    @Override
-    public void encode(Integer value, OutputStream outstream, Context context) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Integer decode(InputStream instream, Context context) {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void verifyDeterministic() {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Collections.emptyList();
-    }
-  }
-
   @Test
   public void testCoderEquals() throws Exception {
     DelegateCoder.CodingFunction<Integer, Integer> identityFn =

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 894d2d1..c0a4bed 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -169,7 +169,7 @@ public class NullableCoderTest {
     assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(String.class)));
   }
 
-  private static class EntireStreamExpectingCoder extends CustomCoder<String> {
+  private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
     @Override
     public void encode(
         String value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/987c2cbc/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
index f337f36..164d221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/CoderPropertiesTest.java
@@ -24,6 +24,7 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -47,7 +48,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder that says it is not deterministic but actually is. */
-  public static class NonDeterministicCoder extends CustomCoder<String> {
+  public static class NonDeterministicCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Context context)
         throws CoderException, IOException {
@@ -59,17 +60,23 @@ public class CoderPropertiesTest {
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);
     }
+
+    public void verifyDeterministic() throws NonDeterministicException {
+      throw new NonDeterministicException(this, "Not Deterministic");
+    }
   }
 
   @Test
   public void testNonDeterministicCoder() throws Exception {
     try {
       CoderProperties.coderDeterministic(new NonDeterministicCoder(), "TestData", "TestData");
-      fail("Expected AssertionError");
     } catch (AssertionError error) {
       assertThat(error.getMessage(),
           CoreMatchers.containsString("Expected that the coder is deterministic"));
+      // success!
+      return;
     }
+    fail("Expected AssertionError");
   }
 
   @Test
@@ -84,7 +91,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder that is non-deterministic because it adds a string to the value. */
-  private static class BadDeterminsticCoder extends CustomCoder<String> {
+  private static class BadDeterminsticCoder extends AtomicCoder<String> {
     public BadDeterminsticCoder() {
     }
 
@@ -141,6 +148,17 @@ public class CoderPropertiesTest {
       String decodedValue = StringUtf8Coder.of().decode(inStream, context);
       return decodedValue.substring(0, decodedValue.length() - changedState);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return other instanceof StateChangingSerializingCoder
+          && ((StateChangingSerializingCoder) other).changedState == this.changedState;
+    }
+
+    @Override
+    public int hashCode() {
+      return changedState;
+    }
   }
 
   @Test
@@ -175,6 +193,17 @@ public class CoderPropertiesTest {
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);
     }
+
+    @Override
+    public boolean equals(Object other) {
+      return (other instanceof ForgetfulSerializingCoder)
+          && ((ForgetfulSerializingCoder) other).lostState == lostState;
+    }
+
+    @Override
+    public int hashCode() {
+      return lostState;
+    }
   }
 
   @Test
@@ -185,7 +214,7 @@ public class CoderPropertiesTest {
   }
 
   /** A coder which closes the underlying stream during encoding and decoding. */
-  public static class ClosingCoder extends CustomCoder<String> {
+  public static class ClosingCoder extends AtomicCoder<String> {
     @Override
     public void encode(String value, OutputStream outStream, Context context) throws IOException {
       outStream.close();


Mime
View raw message