beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [3/3] beam git commit: Stop Extending AtomicCoder
Date Tue, 25 Apr 2017 01:03:26 GMT
Stop Extending AtomicCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7ad9efc7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7ad9efc7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7ad9efc7

Branch: refs/heads/master
Commit: 7ad9efc7a678f042d67cd90536f21aca4736e4c3
Parents: 02e60a5
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Apr 20 09:39:12 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Apr 24 16:47:29 2017 -0700

----------------------------------------------------------------------
 .../direct/CloningBundleFactoryTest.java        | 10 ++---
 .../beam/runners/direct/DirectRunnerTest.java   |  5 +--
 .../UnboundedReadEvaluatorFactoryTest.java      |  4 +-
 .../runners/dataflow/internal/IsmFormat.java    | 41 +++++++++++++++-----
 .../runners/dataflow/util/RandomAccessData.java | 14 +++++--
 .../apache/beam/sdk/coders/BigDecimalCoder.java | 31 +++++++++------
 .../beam/sdk/coders/BigEndianIntegerCoder.java  | 12 ++++--
 .../beam/sdk/coders/BigEndianLongCoder.java     | 10 ++++-
 .../apache/beam/sdk/coders/BigIntegerCoder.java | 23 +++++++----
 .../apache/beam/sdk/coders/ByteArrayCoder.java  | 18 ++++++++-
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  9 +++--
 .../apache/beam/sdk/coders/ByteStringCoder.java |  7 ++--
 .../org/apache/beam/sdk/coders/CustomCoder.java | 22 ++++++++++-
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  9 +++--
 .../apache/beam/sdk/coders/DurationCoder.java   | 19 +++++----
 .../apache/beam/sdk/coders/InstantCoder.java    | 25 +++++++-----
 .../beam/sdk/coders/SerializableCoder.java      |  2 +-
 .../apache/beam/sdk/coders/StringUtf8Coder.java |  7 ++--
 .../beam/sdk/coders/TextualIntegerCoder.java    | 14 +++++--
 .../org/apache/beam/sdk/coders/VarIntCoder.java | 12 ++++--
 .../apache/beam/sdk/coders/VarLongCoder.java    | 22 +++++++++--
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  5 ++-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  9 +----
 .../org/apache/beam/sdk/transforms/Mean.java    | 10 ++++-
 .../sdk/transforms/windowing/GlobalWindow.java  | 17 +++++++-
 .../transforms/windowing/IntervalWindow.java    | 20 ++++++++--
 .../beam/sdk/transforms/windowing/PaneInfo.java |  7 +++-
 .../org/apache/beam/sdk/util/BitSetCoder.java   | 13 +++----
 .../apache/beam/sdk/testing/PAssertTest.java    |  4 +-
 .../sdk/testing/SerializableMatchersTest.java   |  4 +-
 .../beam/sdk/testing/WindowSupplierTest.java    |  4 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |  3 +-
 .../beam/sdk/transforms/GroupByKeyTest.java     |  9 +++--
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 ++-
 .../apache/beam/sdk/util/CoderUtilsTest.java    |  4 +-
 .../sdk/extensions/protobuf/ProtoCoder.java     |  4 +-
 .../io/gcp/bigquery/TableDestinationCoder.java  |  6 +--
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |  6 +--
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java  | 12 ++++--
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  7 ++--
 .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java  |  4 +-
 .../io/gcp/pubsub/PubsubUnboundedSource.java    |  4 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 10 +++--
 .../beam/sdk/io/hbase/HBaseMutationCoder.java   |  5 +--
 .../beam/sdk/io/hbase/HBaseResultCoder.java     |  5 +--
 sdks/java/io/kafka/pom.xml                      |  5 ---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  4 +-
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     | 16 +-------
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  4 +-
 sdks/java/io/xml/pom.xml                        |  5 ---
 .../org/apache/beam/sdk/io/xml/JAXBCoder.java   | 39 ++-----------------
 51 files changed, 342 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 3d14a12..c6054b6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -33,8 +33,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -175,7 +175,7 @@ public class CloningBundleFactoryTest {
   }
 
   static class Record {}
-  static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+  static class RecordNoEncodeCoder extends CustomCoder<Record> {
 
     @Override
     public void encode(
@@ -194,7 +194,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+  static class RecordNoDecodeCoder extends CustomCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -210,7 +210,7 @@ public class CloningBundleFactoryTest {
     }
   }
 
-  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+  private static class RecordStructuralValueCoder extends CustomCoder<Record> {
     @Override
     public void encode(
         Record value,
@@ -242,7 +242,7 @@ public class CloningBundleFactoryTest {
   }
 
   private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends AtomicCoder<Record> {
+      extends CustomCoder<Record> {
     @Override
     public void encode(
         Record value,

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 86d7f05..c55f84a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -44,9 +44,9 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -530,7 +530,6 @@ public class DirectRunnerTest implements Serializable {
     p.run();
   }
 
-
   @Test
   public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
     Pipeline p = getPipeline();
@@ -560,7 +559,7 @@ public class DirectRunnerTest implements Serializable {
     }
   }
 
-  private static class LongNoDecodeCoder extends AtomicCoder<Long> {
+  private static class LongNoDecodeCoder extends CustomCoder<Long> {
     @Override
     public void encode(
         Long value, OutputStream outStream, Context context) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 567ee98..521ba3f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -48,10 +48,10 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
 import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
@@ -588,7 +588,7 @@ public class UnboundedReadEvaluatorFactoryTest {
       return finalized;
     }
 
-    public static class Coder extends AtomicCoder<TestCheckpointMark> {
+    public static class Coder extends CustomCoder<TestCheckpointMark> {
       @Override
       public void encode(
           TestCheckpointMark value,

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 33c27f8..97824dc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -37,11 +37,11 @@ import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.util.RandomAccessData;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -628,17 +628,15 @@ public class IsmFormat {
    *   <li>indexOffset (variable length long encoding)</li>
    * </ul>
    */
-  public static class IsmShardCoder extends AtomicCoder<IsmShard> {
+  public static class IsmShardCoder extends CustomCoder<IsmShard> {
     private static final IsmShardCoder INSTANCE = new IsmShardCoder();
 
     /** Returns an IsmShardCoder. */
-    @JsonCreator
     public static IsmShardCoder of() {
       return INSTANCE;
     }
 
-    private IsmShardCoder() {
-    }
+    private IsmShardCoder() {}
 
     @Override
     public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
@@ -661,9 +659,20 @@ public class IsmFormat {
     }
 
     @Override
+    public void verifyDeterministic() {
+      VarIntCoder.of().verifyDeterministic();
+      VarLongCoder.of().verifyDeterministic();
+    }
+
+    @Override
     public boolean consistentWithEquals() {
       return true;
     }
+
+    @Override
+    public String getEncodingId() {
+      return "";
+    }
   }
 
   /**
@@ -689,10 +698,9 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link KeyPrefix}. */
-  public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
+  public static final class KeyPrefixCoder extends CustomCoder<KeyPrefix> {
     private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
 
-    @JsonCreator
     public static KeyPrefixCoder of() {
       return INSTANCE;
     }
@@ -711,6 +719,9 @@ public class IsmFormat {
     }
 
     @Override
+    public void verifyDeterministic() {}
+
+    @Override
     public boolean consistentWithEquals() {
       return true;
     }
@@ -727,6 +738,11 @@ public class IsmFormat {
       return VarInt.getLength(value.getSharedKeySize())
           + VarInt.getLength(value.getUnsharedKeySize());
     }
+
+    @Override
+    public String getEncodingId() {
+      return "";
+    }
   }
 
   /**
@@ -759,10 +775,9 @@ public class IsmFormat {
   }
 
   /** A {@link Coder} for {@link Footer}. */
-  public static final class FooterCoder extends AtomicCoder<Footer> {
+  public static final class FooterCoder extends CustomCoder<Footer> {
     private static final FooterCoder INSTANCE = new FooterCoder();
 
-    @JsonCreator
     public static FooterCoder of() {
       return INSTANCE;
     }
@@ -791,6 +806,9 @@ public class IsmFormat {
     }
 
     @Override
+    public void verifyDeterministic() {}
+
+    @Override
     public boolean consistentWithEquals() {
       return true;
     }
@@ -805,5 +823,10 @@ public class IsmFormat {
         throws Exception {
       return Footer.FIXED_LENGTH;
     }
+
+    @Override
+    public String getEncodingId() {
+      return "";
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 11eec19..4b07ca2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.util;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.MoreObjects;
 import com.google.common.io.ByteStreams;
 import com.google.common.primitives.UnsignedBytes;
@@ -31,10 +30,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Comparator;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.VarInt;
 
 /**
@@ -56,10 +55,9 @@ public class RandomAccessData {
    *
    * <p>This coder does not support encoding positive infinity.
    */
-  public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
+  public static class RandomAccessDataCoder extends CustomCoder<RandomAccessData> {
     private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
 
-    @JsonCreator
     public static RandomAccessDataCoder of() {
       return INSTANCE;
     }
@@ -90,6 +88,9 @@ public class RandomAccessData {
     }
 
     @Override
+    public void verifyDeterministic() {}
+
+    @Override
     public boolean consistentWithEquals() {
       return true;
     }
@@ -112,6 +113,11 @@ public class RandomAccessData {
       }
       return size + value.size;
     }
+
+    @Override
+    public String getEncodingId() {
+      return "";
+    }
   }
 
   public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 36c8265..90ace99 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -33,9 +32,8 @@ import java.math.MathContext;
  * {@link BigInteger}, when scaled (with unlimited precision, aka {@link MathContext#UNLIMITED}),
  * yields the expected {@link BigDecimal}.
  */
-public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
+public class BigDecimalCoder extends CustomCoder<BigDecimal> {
 
-  @JsonCreator
   public static BigDecimalCoder of() {
     return INSTANCE;
   }
@@ -44,8 +42,8 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
 
   private static final BigDecimalCoder INSTANCE = new BigDecimalCoder();
 
-  private final VarIntCoder integerCoder = VarIntCoder.of();
-  private final BigIntegerCoder bigIntegerCoder = BigIntegerCoder.of();
+  private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
+  private static final BigIntegerCoder BIG_INT_CODER = BigIntegerCoder.of();
 
   private BigDecimalCoder() {}
 
@@ -53,18 +51,24 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   public void encode(BigDecimal value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
-    integerCoder.encode(value.scale(), outStream, context.nested());
-    bigIntegerCoder.encode(value.unscaledValue(), outStream, context);
+    VAR_INT_CODER.encode(value.scale(), outStream, context.nested());
+    BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
   }
 
   @Override
   public BigDecimal decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    int scale = integerCoder.decode(inStream, context.nested());
-    BigInteger bigInteger = bigIntegerCoder.decode(inStream, context);
+    int scale = VAR_INT_CODER.decode(inStream, context.nested());
+    BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context);
     return new BigDecimal(bigInteger, scale);
   }
 
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    VAR_INT_CODER.verifyDeterministic();
+    BIG_INT_CODER.verifyDeterministic();
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -75,6 +79,11 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -95,7 +104,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
   @Override
   protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception {
     checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
-    return integerCoder.getEncodedElementByteSize(value.scale(), context.nested())
-        + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context);
+    return VAR_INT_CODER.getEncodedElementByteSize(value.scale(), context.nested())
+        + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue(), context);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index 2922416..8f45a56 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
  */
-public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
+public class BigEndianIntegerCoder extends CustomCoder<Integer> {
 
-  @JsonCreator
   public static BigEndianIntegerCoder of() {
     return INSTANCE;
   }
@@ -65,6 +63,9 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
     }
   }
 
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *
@@ -75,6 +76,11 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 26aadde..2d47697 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
  */
-public class BigEndianLongCoder extends AtomicCoder<Long> {
+public class BigEndianLongCoder extends CustomCoder<Long> {
 
   @JsonCreator
   public static BigEndianLongCoder of() {
@@ -65,6 +65,9 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
     }
   }
 
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *
@@ -75,6 +78,11 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index daba983..40331b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.coders;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -29,9 +28,8 @@ import java.math.BigInteger;
  * A {@link BigIntegerCoder} encodes a {@link BigInteger} as a byte array containing the big endian
  * two's-complement representation, encoded via {@link ByteArrayCoder}.
  */
-public class BigIntegerCoder extends AtomicCoder<BigInteger> {
+public class BigIntegerCoder extends CustomCoder<BigInteger> {
 
-  @JsonCreator
   public static BigIntegerCoder of() {
     return INSTANCE;
   }
@@ -39,22 +37,26 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final BigIntegerCoder INSTANCE = new BigIntegerCoder();
+  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
 
   private BigIntegerCoder() {}
 
-  private final ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();
-
   @Override
   public void encode(BigInteger value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
-    byteArrayCoder.encode(value.toByteArray(), outStream, context);
+    BYTE_ARRAY_CODER.encode(value.toByteArray(), outStream, context);
   }
 
   @Override
   public BigInteger decode(InputStream inStream, Context context)
       throws IOException, CoderException {
-    return new BigInteger(byteArrayCoder.decode(inStream, context));
+    return new BigInteger(BYTE_ARRAY_CODER.decode(inStream, context));
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    BYTE_ARRAY_CODER.verifyDeterministic();
   }
 
   /**
@@ -67,6 +69,11 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -85,6 +92,6 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
   @Override
   protected long getEncodedElementByteSize(BigInteger value, Context context) throws Exception {
     checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
-    return byteArrayCoder.getEncodedElementByteSize(value.toByteArray(), context);
+    return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray(), context);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index a9449c6..dd34f28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -22,6 +22,8 @@ import com.google.common.io.ByteStreams;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
 import org.apache.beam.sdk.util.StreamUtils;
 import org.apache.beam.sdk.util.VarInt;
@@ -38,13 +40,19 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * encoded via a {@link VarIntCoder}.</li>
  * </ul>
  */
-public class ByteArrayCoder extends AtomicCoder<byte[]> {
+public class ByteArrayCoder extends StandardCoder<byte[]> {
 
   @JsonCreator
   public static ByteArrayCoder of() {
     return INSTANCE;
   }
 
+  /**
+   * Returns an empty list. {@link ByteArrayCoder} has no components.
+   */
+  public static <T> List<Object> getInstanceComponents(T ignored) {
+    return Collections.emptyList();
+  }
 
   /////////////////////////////////////////////////////////////////////////////
 
@@ -103,6 +111,14 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
     }
   }
 
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return null;
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {}
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 0eda58d..2ef166b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,9 +27,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
  */
-public class ByteCoder extends AtomicCoder<Byte> {
+public class ByteCoder extends CustomCoder<Byte> {
 
-  @JsonCreator
   public static ByteCoder of() {
     return INSTANCE;
   }
@@ -87,6 +85,11 @@ public class ByteCoder extends AtomicCoder<Byte> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
index 1e3634c..1b27b5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.io.ByteStreams;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -32,9 +31,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString}
  * objects are first delimited by their size.
  */
-public class ByteStringCoder extends AtomicCoder<ByteString> {
+public class ByteStringCoder extends CustomCoder<ByteString> {
 
-  @JsonCreator
   public static ByteStringCoder of() {
     return INSTANCE;
   }
@@ -84,6 +82,9 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
     return VarInt.getLength(size) + size;
   }
 
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 59d29de..fbf65df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -22,6 +22,8 @@ import static org.apache.beam.sdk.util.Structs.addString;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.StringUtils;
@@ -39,7 +41,7 @@ import org.apache.beam.sdk.util.StringUtils;
  *
  * @param <T> the type of elements handled by this coder
  */
-public abstract class CustomCoder<T> extends AtomicCoder<T>
+public abstract class CustomCoder<T> extends StandardCoder<T>
     implements Serializable {
   @JsonCreator
   public static CustomCoder<?> of(
@@ -62,6 +64,24 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
   }
 
   /**
+   * {@inheritDoc}.
+   *
+   * <p>Returns an empty list. A {@link CustomCoder} has no default argument {@link Coder coders}.
+   */
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Returns an empty list. A {@link CustomCoder} by default will not have component coders that are
+   * used for inference.
+   */
+  public static <T> List<Object> getInstanceComponents(T exampleValue) {
+    return Collections.emptyList();
+  }
+
+  /**
    * {@inheritDoc}
    *
    * @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 771c851..8731e5a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
  */
-public class DoubleCoder extends AtomicCoder<Double> {
+public class DoubleCoder extends CustomCoder<Double> {
 
-  @JsonCreator
   public static DoubleCoder of() {
     return INSTANCE;
   }
@@ -89,6 +87,11 @@ public class DoubleCoder extends AtomicCoder<Double> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index c6f0b18..10a83ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -30,9 +29,8 @@ import org.joda.time.ReadableDuration;
  * A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
  * {@link VarLongCoder}.
  */
-public class DurationCoder extends AtomicCoder<ReadableDuration> {
+public class DurationCoder extends CustomCoder<ReadableDuration> {
 
-  @JsonCreator
   public static DurationCoder of() {
     return INSTANCE;
   }
@@ -43,7 +41,7 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
   private static final TypeDescriptor<ReadableDuration> TYPE_DESCRIPTOR =
       new TypeDescriptor<ReadableDuration>() {};
 
-  private final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarLongCoder LONG_CODER = VarLongCoder.of();
 
   private DurationCoder() {}
 
@@ -61,13 +59,18 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
     if (value == null) {
       throw new CoderException("cannot encode a null ReadableDuration");
     }
-    longCoder.encode(toLong(value), outStream, context);
+    LONG_CODER.encode(toLong(value), outStream, context);
   }
 
   @Override
   public ReadableDuration decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-      return fromLong(longCoder.decode(inStream, context));
+      return fromLong(LONG_CODER.decode(inStream, context));
+  }
+
+  @Override
+  public void verifyDeterministic() {
+    LONG_CODER.verifyDeterministic();
   }
 
   /**
@@ -87,13 +90,13 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
    */
   @Override
   public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) {
-    return longCoder.isRegisterByteSizeObserverCheap(toLong(value), context);
+    return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value), context);
   }
 
   @Override
   public void registerByteSizeObserver(
       ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
-    longCoder.registerByteSizeObserver(toLong(value), observer, context);
+    LONG_CODER.registerByteSizeObserver(toLong(value), observer, context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 325a7db..48b7275 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.Converter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,9 +29,7 @@ import org.joda.time.Instant;
  * A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
  * shifted such that lexicographic ordering of the bytes corresponds to chronological order.
  */
-public class InstantCoder extends AtomicCoder<Instant> {
-
-  @JsonCreator
+public class InstantCoder extends CustomCoder<Instant> {
   public static InstantCoder of() {
     return INSTANCE;
   }
@@ -42,7 +39,7 @@ public class InstantCoder extends AtomicCoder<Instant> {
   private static final InstantCoder INSTANCE = new InstantCoder();
   private static final TypeDescriptor<Instant> TYPE_DESCRIPTOR = new TypeDescriptor<Instant>() {};
 
-  private final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+  private static final BigEndianLongCoder LONG_CODER = BigEndianLongCoder.of();
 
   private InstantCoder() {}
 
@@ -76,13 +73,18 @@ public class InstantCoder extends AtomicCoder<Instant> {
     if (value == null) {
       throw new CoderException("cannot encode a null Instant");
     }
-    longCoder.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
+    LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
   }
 
   @Override
   public Instant decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-    return ORDER_PRESERVING_CONVERTER.reverse().convert(longCoder.decode(inStream, context));
+    return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context));
+  }
+
+  @Override
+  public void verifyDeterministic() {
+    LONG_CODER.verifyDeterministic();
   }
 
   /**
@@ -95,6 +97,11 @@ public class InstantCoder extends AtomicCoder<Instant> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -102,14 +109,14 @@ public class InstantCoder extends AtomicCoder<Instant> {
    */
   @Override
   public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) {
-    return longCoder.isRegisterByteSizeObserverCheap(
+    return LONG_CODER.isRegisterByteSizeObserverCheap(
         ORDER_PRESERVING_CONVERTER.convert(value), context);
   }
 
   @Override
   public void registerByteSizeObserver(
       Instant value, ElementByteSizeObserver observer, Context context) throws Exception {
-    longCoder.registerByteSizeObserver(
+    LONG_CODER.registerByteSizeObserver(
         ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 49f5b8d..1314a6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -46,7 +46,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the type of elements handled by this coder
  */
-public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
+public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
 
   /**
    * Returns a {@link SerializableCoder} instance for the provided element type.

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index cd124ef..f0a0969 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.Utf8;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.CountingOutputStream;
@@ -39,9 +38,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * If in a nested context, prefixes the string with an integer length field,
  * encoded via a {@link VarIntCoder}.
  */
-public class StringUtf8Coder extends AtomicCoder<String> {
+public class StringUtf8Coder extends CustomCoder<String> {
 
-  @JsonCreator
   public static StringUtf8Coder of() {
     return INSTANCE;
   }
@@ -105,6 +103,9 @@ public class StringUtf8Coder extends AtomicCoder<String> {
     }
   }
 
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 1b79e90..817817b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -27,9 +26,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
  * their textual, decimal, representation.
  */
-public class TextualIntegerCoder extends AtomicCoder<Integer> {
+public class TextualIntegerCoder extends CustomCoder<Integer> {
 
-  @JsonCreator
   public static TextualIntegerCoder of() {
     return new TextualIntegerCoder();
   }
@@ -62,6 +60,16 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
+  public void verifyDeterministic() {
+    StringUtf8Coder.of().verifyDeterministic();
+  }
+
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
+  @Override
   public TypeDescriptor<Integer> getEncodedTypeDescriptor() {
     return TYPE_DESCRIPTOR;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index ec9d8f4..9c654a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,9 +30,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
  * integers that are known to often be large or negative.
  */
-public class VarIntCoder extends AtomicCoder<Integer> {
+public class VarIntCoder extends CustomCoder<Integer> {
 
-  @JsonCreator
   public static VarIntCoder of() {
     return INSTANCE;
   }
@@ -66,6 +64,9 @@ public class VarIntCoder extends AtomicCoder<Integer> {
     }
   }
 
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *
@@ -76,6 +77,11 @@ public class VarIntCoder extends AtomicCoder<Integer> {
     return true;
   }
 
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 3f1334d..16474ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -17,12 +17,13 @@
  */
 package org.apache.beam.sdk.coders;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -31,13 +32,18 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for
  * longs that are known to often be large or negative.
  */
-public class VarLongCoder extends AtomicCoder<Long> {
-
-  @JsonCreator
+public class VarLongCoder extends StandardCoder<Long> {
   public static VarLongCoder of() {
     return INSTANCE;
   }
 
+  /**
+   * Returns an empty list. {@link VarLongCoder} has no components.
+   */
+  public static <T> List<Object> getInstanceComponents(T ignored) {
+    return Collections.emptyList();
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   private static final VarLongCoder INSTANCE = new VarLongCoder();
@@ -66,6 +72,14 @@ public class VarLongCoder extends AtomicCoder<Long> {
     }
   }
 
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 318485d..a65fa5e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 /**
  * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
  */
-public class VoidCoder extends AtomicCoder<Void> {
+public class VoidCoder extends CustomCoder<Void> {
 
   @JsonCreator
   public static VoidCoder of() {
@@ -50,6 +50,9 @@ public class VoidCoder extends AtomicCoder<Void> {
     return null;
   }
 
+  @Override
+  public void verifyDeterministic() {}
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d9682e7..6045148 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -22,11 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -41,12 +39,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
-
 import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
@@ -936,11 +932,10 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * A coder for FileResult objects.
    */
-  public static final class FileResultCoder extends AtomicCoder<FileResult> {
+  public static final class FileResultCoder extends CustomCoder<FileResult> {
     private static final FileResultCoder INSTANCE = new FileResultCoder();
     private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
 
-    @JsonCreator
     public static FileResultCoder of() {
       return INSTANCE;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 5e7c003..a6808cf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
 
@@ -181,7 +181,7 @@ public class Mean {
   }
 
   static class CountSumCoder<NumT extends Number>
-  extends AtomicCoder<CountSum<NumT>> {
+  extends CustomCoder<CountSum<NumT>> {
      private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
      private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
 
@@ -199,5 +199,11 @@ public class Mean {
            LONG_CODER.decode(inStream, context.nested()),
            DOUBLE_CODER.decode(inStream, context));
     }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+       LONG_CODER.verifyDeterministic();
+       DOUBLE_CODER.verifyDeterministic();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index ad6a9fd..ffc8011 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.util.CloudObject;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -62,7 +64,7 @@ public class GlobalWindow extends BoundedWindow {
   /**
    * {@link Coder} for encoding and decoding {@code GlobalWindow}s.
    */
-  public static class Coder extends AtomicCoder<GlobalWindow> {
+  public static class Coder extends StandardCoder<GlobalWindow> {
     public static final Coder INSTANCE = new Coder();
 
     @Override
@@ -86,6 +88,17 @@ public class GlobalWindow extends BoundedWindow {
       return CloudObject.forClassName("kind:global_window");
     }
 
+    @Override
+    public final List<org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+
+    /**
+     * Returns an empty list. The Global Window Coder has no components.
+     */
+    public static <T> List<Object> getInstanceComponents(T exampleValue) {
+      return Collections.emptyList();
+    }
     private Coder() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index eff4d99..aaa2e83 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -17,15 +17,16 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import java.util.Collections;
+import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.DurationCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.util.CloudObject;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -167,18 +168,24 @@ public class IntervalWindow extends BoundedWindow
   /**
    * Encodes an {@link IntervalWindow} as a pair of its upper bound and duration.
    */
-  public static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> {
+  public static class IntervalWindowCoder extends StandardCoder<IntervalWindow> {
 
     private static final IntervalWindowCoder INSTANCE = new IntervalWindowCoder();
 
     private static final Coder<Instant> instantCoder = InstantCoder.of();
     private static final Coder<ReadableDuration> durationCoder = DurationCoder.of();
 
-    @JsonCreator
     public static IntervalWindowCoder of() {
       return INSTANCE;
     }
 
+    /**
+     * Returns an empty list. {@link IntervalWindowCoder} has no components.
+     */
+    public static <T> List<Object> getInstanceComponents(T value) {
+      return Collections.emptyList();
+    }
+
     @Override
     public void encode(IntervalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {
@@ -206,6 +213,11 @@ public class IntervalWindow extends BoundedWindow
     }
 
     @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+
+    @Override
     protected CloudObject initializeCloudObject() {
       return CloudObject.forClassName("kind:interval_window");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index c73eb39..158bb04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -26,9 +26,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Objects;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.util.VarInt;
@@ -306,7 +306,7 @@ public final class PaneInfo {
   /**
    * A Coder for encoding PaneInfo instances.
    */
-  public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
+  public static class PaneInfoCoder extends CustomCoder<PaneInfo> {
     private enum Encoding {
       FIRST,
       ONE_INDEX,
@@ -383,5 +383,8 @@ public final class PaneInfo {
       }
       return new PaneInfo(base.isFirst, base.isLast, base.timing, index, onTimeIndex);
     }
+
+    @Override
+    public void verifyDeterministic() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index 72524bd..eda4e5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -21,17 +21,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.BitSet;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
  * Coder for the BitSet used to track child-trigger finished states.
  */
-public class BitSetCoder extends AtomicCoder<BitSet> {
+public class BitSetCoder extends CustomCoder<BitSet> {
 
   private static final BitSetCoder INSTANCE = new BitSetCoder();
-  private static final ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();
+  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
 
   private BitSetCoder() {}
 
@@ -42,19 +42,18 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
   @Override
   public void encode(BitSet value, OutputStream outStream, Context context)
       throws CoderException, IOException {
-    byteArrayCoder.encodeAndOwn(value.toByteArray(), outStream, context);
+    BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
   }
 
   @Override
   public BitSet decode(InputStream inStream, Context context)
       throws CoderException, IOException {
-    return BitSet.valueOf(byteArrayCoder.decode(inStream, context));
+    return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
   }
 
   @Override
   public void verifyDeterministic() throws NonDeterministicException {
     verifyDeterministic(
-        "BitSetCoder requires its byteArrayCoder to be deterministic.",
-        byteArrayCoder);
+        "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 2ef892c..cfe7436 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -88,7 +88,7 @@ public class PAssertTest implements Serializable {
     }
   }
 
-  private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
+  private static class NotSerializableObjectCoder extends CustomCoder<NotSerializableObject> {
     private NotSerializableObjectCoder() { }
     private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index db5ff2e..ddc92d6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -29,8 +29,8 @@ import com.google.common.collect.ImmutableList;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.KV;
 import org.hamcrest.Matchers;
@@ -151,7 +151,7 @@ public class SerializableMatchersTest implements Serializable {
     }
   }
 
-  private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
+  private static class NotSerializableClassCoder extends CustomCoder<NotSerializableClass> {
     @Override
     public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 1ab4c27..38a2fa2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Collections;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -72,7 +72,7 @@ public class WindowSupplierTest {
         Collections.<BoundedWindow>singleton(window));
   }
 
-  private static class FailingCoder extends AtomicCoder<BoundedWindow>  {
+  private static class FailingCoder extends CustomCoder<BoundedWindow>  {
     @Override
     public void encode(
         BoundedWindow value, OutputStream outStream, Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 8a30476..89a1f33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -36,7 +36,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -202,7 +201,7 @@ public class CreateTest {
       return myString.equals(((UnserializableRecord) o).myString);
     }
 
-    static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
+    static class UnserializableRecordCoder extends CustomCoder<UnserializableRecord> {
       private final Coder<String> stringCoder = StringUtf8Coder.of();
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 3443847..939261f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -26,7 +26,6 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInA
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import java.io.DataInputStream;
@@ -39,9 +38,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -455,9 +454,8 @@ public class GroupByKeyTest {
   /**
    * Deterministic {@link Coder} for {@link BadEqualityKey}.
    */
-  static class DeterministicKeyCoder extends AtomicCoder<BadEqualityKey> {
+  static class DeterministicKeyCoder extends CustomCoder<BadEqualityKey> {
 
-    @JsonCreator
     public static DeterministicKeyCoder of() {
       return INSTANCE;
     }
@@ -480,6 +478,9 @@ public class GroupByKeyTest {
         throws IOException {
       return new BadEqualityKey(new DataInputStream(inStream).readLong());
     }
+
+    @Override
+    public void verifyDeterministic() {}
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3424f86..1a976f2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,7 +55,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
@@ -979,7 +978,7 @@ public class ParDoTest implements Serializable {
 
   private static class TestDummy { }
 
-  private static class TestDummyCoder extends AtomicCoder<TestDummy> {
+  private static class TestDummyCoder extends CustomCoder<TestDummy> {
     private TestDummyCoder() { }
     private static final TestDummyCoder INSTANCE = new TestDummyCoder();
 
@@ -1015,6 +1014,9 @@ public class ParDoTest implements Serializable {
         throws Exception {
       observer.update(0L);
     }
+
+    @Override
+    public void verifyDeterministic() {}
   }
 
   private static class TaggedOutputDummyFn extends DoFn<Integer, Integer> {

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 4bd2f19..32c2af4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -25,11 +25,11 @@ import static org.mockito.Mockito.mock;
 
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -52,7 +52,7 @@ public class CoderUtilsTest {
   @Rule
   public transient ExpectedException expectedException = ExpectedException.none();
 
-  static class TestCoder extends AtomicCoder<Integer> {
+  static class TestCoder extends CustomCoder<Integer> {
     public static TestCoder of() {
       return new TestCoder();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 99a0838..ece3eca 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -40,12 +40,12 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CoderProvider;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.values.PCollection;
@@ -110,7 +110,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  *
  * @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
  */
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
+public class ProtoCoder<T extends Message> extends CustomCoder<T> {
 
   /**
    * A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 262a00d..c418804 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -18,20 +18,18 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /** A coder for {@link TableDestination} objects. */
-public class TableDestinationCoder extends AtomicCoder<TableDestination> {
+public class TableDestinationCoder extends CustomCoder<TableDestination> {
   private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
   private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
 
-  @JsonCreator
   public static TableDestinationCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 9ef947e..c3e48a4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -18,23 +18,21 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
  * Defines a coder for {@link TableRowInfo} objects.
  */
 @VisibleForTesting
-class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
+class TableRowInfoCoder extends CustomCoder<TableRowInfo> {
   private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
 
-  @JsonCreator
   public static TableRowInfoCoder of() {
     return INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index ce4b669..e5f8591 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -17,24 +17,22 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
  */
-public class TableRowJsonCoder extends AtomicCoder<TableRow> {
+public class TableRowJsonCoder extends CustomCoder<TableRow> {
 
-  @JsonCreator
   public static TableRowJsonCoder of() {
     return INSTANCE;
   }
@@ -88,4 +86,10 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   public TypeDescriptor<TableRow> getEncodedTypeDescriptor() {
     return TYPE_DESCRIPTOR;
   }
+
+  @Override
+  public String getEncodingId() {
+    return "";
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index a25cc90..d337476 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -26,8 +26,8 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -67,7 +67,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
   /**
    * a coder for the {@link Result} class.
    */
-  public static class ResultCoder extends AtomicCoder<Result> {
+  public static class ResultCoder extends CustomCoder<Result> {
     private static final ResultCoder INSTANCE = new ResultCoder();
     private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
     private static final VarLongCoder longCoder = VarLongCoder.of();
@@ -98,8 +98,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
     }
 
     @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-    }
+    public void verifyDeterministic() {}
   }
 
   WriteBundlesToFiles(String tempFilePrefix) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 3ce9224..f1dc1e8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -32,11 +32,11 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -103,7 +103,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
   /**
    * Coder for conveying outgoing messages between internal stages.
    */
-  private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
+  private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
     private static final NullableCoder<String> RECORD_ID_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 0389d4b..558b944 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -46,9 +46,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -359,7 +359,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
   }
 
   /** The coder for our checkpoints. */
-  private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> {
+  private static class PubsubCheckpointCoder<T> extends CustomCoder<PubsubCheckpoint<T>> {
     private static final Coder<String> SUBSCRIPTION_PATH_CODER =
         NullableCoder.of(StringUtf8Coder.of());
     private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e11dd74..e0a5fac 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+
 import com.google.api.client.util.Data;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatistics;
@@ -44,7 +45,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
@@ -64,12 +64,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -590,7 +589,7 @@ public class BigQueryIOTest implements Serializable {
   /**
    * Coder for @link{PartitionedGlobalWindow}.
    */
-  private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
+  private static class PartitionedGlobalWindowCoder extends CustomCoder<PartitionedGlobalWindow> {
     @Override
     public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {
@@ -602,6 +601,9 @@ public class BigQueryIOTest implements Serializable {
         throws IOException, CoderException {
       return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));
     }
+
+    @Override
+    public void verifyDeterministic() {}
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 228e0b4..35a8863 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,9 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -35,7 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
  * A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link
  * ProtobufUtil}.
  */
-class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
+class HBaseMutationCoder extends CustomCoder<Mutation> implements Serializable {
   private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder();
 
   private HBaseMutationCoder() {}


Mime
View raw message