beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] beam git commit: Added a new method on Coder which returns a TypeDescriptor.
Date Thu, 05 Jan 2017 04:18:42 GMT
Added a new method on Coder which returns a TypeDescriptor.

The new method allows returning type information about the data being
encoded and decoded by a Coder.
Added a default implementation to StandardCoder which returns the
TypeDescriptor for Object to ease the transition and avoid breaking
implementations relying on StandardCoder or AtomicCoder.

This will break classes implementing the Coder interface directly.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b98fa08
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b98fa08
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b98fa08

Branch: refs/heads/master
Commit: 7b98fa08d14e8121e8885f00a9a9a878b73f81a6
Parents: 1e2f90c
Author: Jeremie Lenfant-Engelmann <jeremiele@google.com>
Authored: Wed Dec 7 11:29:29 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Jan 4 20:04:14 2017 -0800

----------------------------------------------------------------------
 .../streaming/io/UnboundedFlinkSink.java        |  6 +++
 .../beam/sdk/annotations/Experimental.java      |  5 ++-
 .../org/apache/beam/sdk/coders/AvroCoder.java   |  7 ++++
 .../beam/sdk/coders/BigEndianIntegerCoder.java  |  7 ++++
 .../beam/sdk/coders/BigEndianLongCoder.java     |  7 ++++
 .../apache/beam/sdk/coders/ByteArrayCoder.java  |  7 ++++
 .../org/apache/beam/sdk/coders/ByteCoder.java   |  7 ++++
 .../apache/beam/sdk/coders/ByteStringCoder.java |  8 ++++
 .../java/org/apache/beam/sdk/coders/Coder.java  |  7 ++++
 .../apache/beam/sdk/coders/CollectionCoder.java |  8 ++++
 .../apache/beam/sdk/coders/DelegateCoder.java   | 29 +++++++++++++-
 .../org/apache/beam/sdk/coders/DoubleCoder.java |  7 ++++
 .../apache/beam/sdk/coders/DurationCoder.java   |  8 ++++
 .../apache/beam/sdk/coders/InstantCoder.java    |  7 ++++
 .../apache/beam/sdk/coders/IterableCoder.java   |  8 ++++
 .../org/apache/beam/sdk/coders/JAXBCoder.java   |  8 ++++
 .../org/apache/beam/sdk/coders/KvCoder.java     |  9 +++++
 .../org/apache/beam/sdk/coders/ListCoder.java   |  7 ++++
 .../org/apache/beam/sdk/coders/MapCoder.java    |  9 +++++
 .../apache/beam/sdk/coders/NullableCoder.java   |  6 +++
 .../beam/sdk/coders/SerializableCoder.java      | 13 +++++--
 .../org/apache/beam/sdk/coders/SetCoder.java    |  8 ++++
 .../apache/beam/sdk/coders/StandardCoder.java   |  8 ++++
 .../beam/sdk/coders/StringDelegateCoder.java    | 16 ++++++--
 .../apache/beam/sdk/coders/StringUtf8Coder.java |  7 ++++
 .../beam/sdk/coders/TableRowJsonCoder.java      |  7 ++++
 .../beam/sdk/coders/TextualIntegerCoder.java    |  8 ++++
 .../org/apache/beam/sdk/coders/VarIntCoder.java | 10 ++++-
 .../apache/beam/sdk/coders/VarLongCoder.java    |  7 ++++
 .../org/apache/beam/sdk/coders/VoidCoder.java   |  7 ++++
 .../org/apache/beam/sdk/testing/TestStream.java |  8 ++++
 .../beam/sdk/values/TimestampedValue.java       |  6 +++
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  7 ++++
 .../sdk/coders/BigEndianIntegerCoderTest.java   |  9 +++++
 .../beam/sdk/coders/BigEndianLongCoderTest.java |  9 +++++
 .../beam/sdk/coders/ByteArrayCoderTest.java     |  6 +++
 .../apache/beam/sdk/coders/ByteCoderTest.java   |  9 +++++
 .../beam/sdk/coders/ByteStringCoderTest.java    |  8 ++++
 .../beam/sdk/coders/CoderRegistryTest.java      |  6 +++
 .../org/apache/beam/sdk/coders/CoderTest.java   |  8 ++++
 .../beam/sdk/coders/CollectionCoderTest.java    | 10 +++++
 .../beam/sdk/coders/DefaultCoderTest.java       |  4 +-
 .../beam/sdk/coders/DelegateCoderTest.java      | 35 ++++++++++++++++-
 .../apache/beam/sdk/coders/DoubleCoderTest.java |  9 +++++
 .../beam/sdk/coders/DurationCoderTest.java      | 10 +++++
 .../beam/sdk/coders/InstantCoderTest.java       |  9 +++++
 .../beam/sdk/coders/IterableCoderTest.java      |  9 +++++
 .../apache/beam/sdk/coders/JAXBCoderTest.java   | 10 +++++
 .../org/apache/beam/sdk/coders/KvCoderTest.java | 10 +++++
 .../apache/beam/sdk/coders/ListCoderTest.java   |  8 ++++
 .../apache/beam/sdk/coders/MapCoderTest.java    | 10 +++++
 .../beam/sdk/coders/NullableCoderTest.java      |  6 +++
 .../beam/sdk/coders/SerializableCoderTest.java  |  9 +++++
 .../apache/beam/sdk/coders/SetCoderTest.java    | 10 +++++
 .../beam/sdk/coders/StandardCoderTest.java      | 40 ++++++++++++++++++++
 .../sdk/coders/StringDelegateCoderTest.java     | 11 ++++++
 .../beam/sdk/coders/StringUtf8CoderTest.java    |  9 +++++
 .../beam/sdk/coders/TableRowJsonCoderTest.java  |  9 +++++
 .../sdk/coders/TextualIntegerCoderTest.java     |  9 +++++
 .../apache/beam/sdk/coders/VarIntCoderTest.java |  9 +++++
 .../beam/sdk/coders/VarLongCoderTest.java       |  9 +++++
 .../apache/beam/sdk/coders/VoidCoderTest.java   | 40 ++++++++++++++++++++
 62 files changed, 605 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
index 5b01796..301d841 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
 /**
@@ -128,6 +129,11 @@ public class UnboundedFlinkSink<T> extends Sink<T> {
           public Collection<String> getAllowedEncodings() {
             return null;
           }
+
+          @Override
+          public TypeDescriptor<Object> getEncodedTypeDescriptor() {
+            return TypeDescriptor.of(Object.class);
+          }
         };
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 2659659..f720599 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -88,6 +88,9 @@ public @interface Experimental {
     METRICS,
 
     /** Experimental runner APIs. Should not be used by pipeline authors. */
-    CORE_RUNNERS_ONLY
+    CORE_RUNNERS_ONLY,
+
+    /** Experimental feature related to making the encoded element type available from a Coder. */
+    CODER_TYPE_ENCODING
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
index eee0906..2c88c9a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
@@ -167,6 +167,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
 
   private final Class<T> type;
   private final SerializableSchemaSupplier schemaSupplier;
+  private final TypeDescriptor<T> typeDescriptor;
 
   private final List<String> nonDeterministicReasons;
 
@@ -239,6 +240,7 @@ public class AvroCoder<T> extends StandardCoder<T> {
   protected AvroCoder(Class<T> type, Schema schema) {
     this.type = type;
     this.schemaSupplier = new SerializableSchemaSupplier(schema);
+    typeDescriptor = TypeDescriptor.of(type);
     nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
 
     // Decoder and Encoder start off null for each thread. They are allocated and potentially
@@ -384,6 +386,11 @@ public class AvroCoder<T> extends StandardCoder<T> {
     return schemaSupplier.get();
   }
 
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return typeDescriptor;
+  }
+
   /**
    * Helper class encapsulating the various pieces of state maintained by the
    * recursive walk used for checking if the encoding will be deterministic.

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index ac8db12..2922416 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
@@ -39,6 +40,7 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final BigEndianIntegerCoder INSTANCE = new BigEndianIntegerCoder();
+  private static final TypeDescriptor<Integer> TYPE_DESCRIPTOR = new TypeDescriptor<Integer>() {};
 
   private BigEndianIntegerCoder() {}
 
@@ -83,6 +85,11 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
     return true;
   }
 
+  @Override
+  public TypeDescriptor<Integer> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index e005324..26aadde 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
@@ -39,6 +40,7 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final BigEndianLongCoder INSTANCE = new BigEndianLongCoder();
+  private static final TypeDescriptor<Long> TYPE_DESCRIPTOR = new TypeDescriptor<Long>() {};
 
   private BigEndianLongCoder() {}
 
@@ -83,6 +85,11 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
     return true;
   }
 
+  @Override
+  public TypeDescriptor<Long> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index 65e24da..a9449c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -25,6 +25,7 @@ import java.io.OutputStream;
 import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
 import org.apache.beam.sdk.util.StreamUtils;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} for {@code byte[]}.
@@ -48,6 +49,7 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();
+  private static final TypeDescriptor<byte[]> TYPE_DESCRIPTOR = new TypeDescriptor<byte[]>() {};
 
   private ByteArrayCoder() {}
 
@@ -123,6 +125,11 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
   }
 
   @Override
+  public TypeDescriptor<byte[]> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
   protected long getEncodedElementByteSize(byte[] value, Context context)
       throws Exception {
     if (value == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index c912b35..0eda58d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
@@ -37,6 +38,7 @@ public class ByteCoder extends AtomicCoder<Byte> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final ByteCoder INSTANCE = new ByteCoder();
+  private static final TypeDescriptor<Byte> TYPE_DESCRIPTOR = new TypeDescriptor<Byte>() {};
 
   private ByteCoder() {}
 
@@ -95,6 +97,11 @@ public class ByteCoder extends AtomicCoder<Byte> {
     return true;
   }
 
+  @Override
+  public TypeDescriptor<Byte> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
index c70b9db..1e3634c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} for {@link ByteString} objects based on their encoded Protocol Buffer form.
@@ -41,6 +42,8 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   /***************************/
 
   private static final ByteStringCoder INSTANCE = new ByteStringCoder();
+  private static final TypeDescriptor<ByteString> TYPE_DESCRIPTOR =
+      new TypeDescriptor<ByteString>() {};
 
   private ByteStringCoder() {}
 
@@ -102,4 +105,9 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   public boolean isRegisterByteSizeObserverCheap(ByteString value, Context context) {
     return true;
   }
+
+  @Override
+  public TypeDescriptor<ByteString> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
index 93e65d3..39efaf2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder Coder&lt;T&gt;} defines how to encode and decode values of type {@code T} into
@@ -254,6 +255,12 @@ public interface Coder<T> extends Serializable {
   Collection<String> getAllowedEncodings();
 
   /**
+   * Returns the {@link TypeDescriptor} for the type encoded.
+   */
+  @Experimental(Kind.CODER_TYPE_ENCODING)
+  TypeDescriptor<T> getEncodedTypeDescriptor();
+
+  /**
    * Exception thrown by {@link Coder#verifyDeterministic()} if the encoding is
    * not deterministic, including details of why the encoding is not deterministic.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
index bf05253..3585f3e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CollectionCoder.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * A {@link CollectionCoder} encodes {@link Collection Collections} in the format
@@ -69,4 +71,10 @@ public class CollectionCoder<T> extends IterableLikeCoder<T, Collection<T>> {
   protected CollectionCoder(Coder<T> elemCoder) {
     super(elemCoder, "Collection");
   }
+
+  @Override
+  public TypeDescriptor<Collection<T>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<Collection<T>>() {}.where(
+        new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
index a5bdedf..1762243 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java
@@ -26,6 +26,8 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@code DelegateCoder<T, IntermediateT>} wraps a {@link Coder} for {@code IntermediateT} and
@@ -55,7 +57,15 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
   public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of(Coder<IntermediateT> coder,
       CodingFunction<T, IntermediateT> toFn,
       CodingFunction<IntermediateT, T> fromFn) {
-    return new DelegateCoder<T, IntermediateT>(coder, toFn, fromFn);
+    return of(coder, toFn, fromFn, null);
+  }
+
+  public static <T, IntermediateT> DelegateCoder<T, IntermediateT> of(
+      Coder<IntermediateT> coder,
+      CodingFunction<T, IntermediateT> toFn,
+      CodingFunction<IntermediateT, T> fromFn,
+      @Nullable TypeDescriptor<T> typeDescriptor) {
+    return new DelegateCoder<T, IntermediateT>(coder, toFn, fromFn, typeDescriptor);
   }
 
   @Override
@@ -154,6 +164,14 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
     return allowedEncodings;
   }
 
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    if (typeDescriptor == null) {
+      return super.getEncodedTypeDescriptor();
+    }
+    return typeDescriptor;
+  }
+
   private String delegateEncodingId(Class<?> delegateClass, String encodingId) {
     return String.format("%s:%s", delegateClass.getName(), encodingId);
   }
@@ -176,11 +194,18 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> {
   private final CodingFunction<T, IntermediateT> toFn;
   private final CodingFunction<IntermediateT, T> fromFn;
 
+  // null unless the user explicitly provides a TypeDescriptor.
+  // If null, then the machinery from the superclass (StandardCoder) will be used
+  // to try to deduce a good type descriptor.
+  @Nullable private final TypeDescriptor<T> typeDescriptor;
+
   protected DelegateCoder(Coder<IntermediateT> coder,
       CodingFunction<T, IntermediateT> toFn,
-      CodingFunction<IntermediateT, T> fromFn) {
+      CodingFunction<IntermediateT, T> fromFn,
+      @Nullable TypeDescriptor<T> typeDescriptor) {
     this.coder = coder;
     this.fromFn = fromFn;
     this.toFn = toFn;
+    this.typeDescriptor = typeDescriptor;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 4e56914..771c851 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
@@ -39,6 +40,7 @@ public class DoubleCoder extends AtomicCoder<Double> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final DoubleCoder INSTANCE = new DoubleCoder();
+  private static final TypeDescriptor<Double> TYPE_DESCRIPTOR = new TypeDescriptor<Double>() {};
 
   private DoubleCoder() {}
 
@@ -97,6 +99,11 @@ public class DoubleCoder extends AtomicCoder<Double> {
     return true;
   }
 
+  @Override
+  public TypeDescriptor<Double> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index a2458f2..c6f0b18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.ReadableDuration;
 
@@ -39,6 +40,8 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final DurationCoder INSTANCE = new DurationCoder();
+  private static final TypeDescriptor<ReadableDuration> TYPE_DESCRIPTOR =
+      new TypeDescriptor<ReadableDuration>() {};
 
   private final VarLongCoder longCoder = VarLongCoder.of();
 
@@ -92,4 +95,9 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
       ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
     longCoder.registerByteSizeObserver(toLong(value), observer, context);
   }
+
+  @Override
+  public TypeDescriptor<ReadableDuration> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 36446aa..325a7db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 
 /**
@@ -39,6 +40,7 @@ public class InstantCoder extends AtomicCoder<Instant> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final InstantCoder INSTANCE = new InstantCoder();
+  private static final TypeDescriptor<Instant> TYPE_DESCRIPTOR = new TypeDescriptor<Instant>() {};
 
   private final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
 
@@ -110,4 +112,9 @@ public class InstantCoder extends AtomicCoder<Instant> {
     longCoder.registerByteSizeObserver(
         ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
   }
+
+  @Override
+  public TypeDescriptor<Instant> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index a1f6fa3..273a896 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.List;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * An {@link IterableCoder} encodes any {@link Iterable} in the format
@@ -73,4 +75,10 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
     addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
     return result;
   }
+
+  @Override
+  public TypeDescriptor<Iterable<T>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<Iterable<T>>() {}.where(
+        new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 7afd225..0a4f9cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -32,6 +32,7 @@ import javax.xml.bind.Unmarshaller;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A coder for JAXB annotated objects. This coder uses JAXB marshalling/unmarshalling mechanisms
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.util.VarInt;
 public class JAXBCoder<T> extends AtomicCoder<T> {
 
   private final Class<T> jaxbClass;
+  private final TypeDescriptor<T> typeDescriptor;
   private transient volatile JAXBContext jaxbContext;
 
   public Class<T> getJAXBClass() {
@@ -50,6 +52,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
 
   private JAXBCoder(Class<T> jaxbClass) {
     this.jaxbClass = jaxbClass;
+    this.typeDescriptor = TypeDescriptor.of(jaxbClass);
   }
 
   /**
@@ -122,6 +125,11 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
     return getJAXBClass().getName();
   }
 
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return typeDescriptor;
+  }
+
   private static class CloseIgnoringInputStream extends FilterInputStream {
 
     protected CloseIgnoringInputStream(InputStream in) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index c9d05fc..3c61bf6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -31,6 +31,8 @@ import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * A {@code KvCoder} encodes {@link KV}s.
@@ -151,4 +153,11 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
     keyCoder.registerByteSizeObserver(kv.getKey(), observer, context.nested());
     valueCoder.registerByteSizeObserver(kv.getValue(), observer, context);
   }
+
+  @Override
+  public TypeDescriptor<KV<K, V>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<KV<K, V>>() {}.where(
+            new TypeParameter<K>() {}, keyCoder.getEncodedTypeDescriptor())
+        .where(new TypeParameter<V>() {}, valueCoder.getEncodedTypeDescriptor());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
index 7878501..6f7a0be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.List;
 import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * A {@link Coder} for {@link List}, using the format of {@link IterableLikeCoder}.
@@ -73,4 +75,9 @@ public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
         "ListCoder.elemCoder must be deterministic", getElemCoder());
   }
 
+  @Override
+  public TypeDescriptor<List<T>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<List<T>>(getClass()) {}.where(
+        new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
index 94099be..7918528 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/MapCoder.java
@@ -35,6 +35,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * A {@link Coder} for {@link Map Maps} that encodes them according to provided
@@ -185,4 +187,11 @@ public class MapCoder<K, V> extends StandardCoder<Map<K, V>> {
     keyCoder.registerByteSizeObserver(entry.getKey(), observer, context.nested());
     valueCoder.registerByteSizeObserver(entry.getValue(), observer, context);
   }
+
+  @Override
+  public TypeDescriptor<Map<K, V>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<Map<K, V>>() {}.where(
+            new TypeParameter<K>() {}, keyCoder.getEncodedTypeDescriptor())
+        .where(new TypeParameter<V>() {}, valueCoder.getEncodedTypeDescriptor());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 8a7a1cb..d1e1370 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -30,6 +30,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link NullableCoder} encodes nullable values of type {@code T} using a nested
@@ -183,4 +184,9 @@ public class NullableCoder<T> extends StandardCoder<T> {
     }
     return valueCoder.isRegisterByteSizeObserverCheap(value, context);
   }
+
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return valueCoder.getEncodedTypeDescriptor();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index de7cea8..49f5b8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -55,7 +55,7 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
   public static <T extends Serializable> SerializableCoder<T> of(TypeDescriptor<T> type) {
     @SuppressWarnings("unchecked")
     Class<T> clazz = (Class<T>) type.getRawType();
-    return of(clazz);
+    return new SerializableCoder<>(clazz, type);
   }
 
   /**
@@ -63,7 +63,7 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
    * @param <T> the element type
    */
   public static <T extends Serializable> SerializableCoder<T> of(Class<T> clazz) {
-    return new SerializableCoder<>(clazz);
+    return new SerializableCoder<>(clazz, TypeDescriptor.of(clazz));
   }
 
   @JsonCreator
@@ -104,9 +104,11 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
 
 
   private final Class<T> type;
+  private final TypeDescriptor<T> typeDescriptor;
 
-  protected SerializableCoder(Class<T> type) {
+  protected SerializableCoder(Class<T> type, TypeDescriptor<T> typeDescriptor) {
     this.type = type;
+    this.typeDescriptor = typeDescriptor;
   }
 
   public Class<T> getRecordType() {
@@ -173,6 +175,11 @@ public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
     return type.hashCode();
   }
 
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return typeDescriptor;
+  }
+
   // This coder inherits isRegisterByteSizeObserverCheap,
   // getEncodedElementByteSize and registerByteSizeObserver
   // from StandardCoder. Looks like we cannot do much better

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
index a8fd1cc..68ef3dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SetCoder.java
@@ -25,6 +25,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 
 /**
  * A {@link SetCoder} encodes any {@link Set} using the format of {@link IterableLikeCoder}. The
@@ -64,6 +66,12 @@ public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
         "Ordering of elements in a set may be non-deterministic.");
   }
 
+  @Override
+  public TypeDescriptor<Set<T>> getEncodedTypeDescriptor() {
+    return new TypeDescriptor<Set<T>>() {}.where(
+        new TypeParameter<T>() {}, getElemCoder().getEncodedTypeDescriptor());
+  }
+
   /**
    * Returns the first element in this set if it is non-empty,
    * otherwise returns {@code null}.

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index e9a1bd3..cfa7206 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -34,6 +34,7 @@ import java.util.List;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * An abstract base class to implement a {@link Coder} that defines equality, hashing, and printing
@@ -255,4 +256,11 @@ public abstract class StandardCoder<T> implements Coder<T> {
       }
     }
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return (TypeDescriptor<T>)
+        TypeDescriptor.of(getClass()).resolveType(new TypeDescriptor<T>() {}.getType());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
index 80bcae3..ad7e28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java
@@ -24,6 +24,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that wraps a {@code Coder<String>}
@@ -49,7 +50,11 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
  */
 public final class StringDelegateCoder<T> extends CustomCoder<T> {
   public static <T> StringDelegateCoder<T> of(Class<T> clazz) {
-    return new StringDelegateCoder<T>(clazz);
+    return StringDelegateCoder.<T>of(clazz, TypeDescriptor.of(clazz));
+  }
+
+  public static <T> StringDelegateCoder<T> of(Class<T> clazz, TypeDescriptor<T> typeDescriptor) {
+    return new StringDelegateCoder<T>(clazz, typeDescriptor);
   }
 
   @Override
@@ -60,7 +65,7 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
   private final DelegateCoder<T, String> delegateCoder;
   private final Class<T> clazz;
 
-  protected StringDelegateCoder(final Class<T> clazz) {
+  protected StringDelegateCoder(final Class<T> clazz, TypeDescriptor<T> typeDescriptor) {
     delegateCoder = DelegateCoder.of(StringUtf8Coder.of(),
       new CodingFunction<T, String>() {
         @Override
@@ -77,7 +82,7 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
             InvocationTargetException {
           return clazz.getConstructor(String.class).newInstance(input);
         }
-      });
+      }, typeDescriptor);
 
     this.clazz = clazz;
   }
@@ -129,5 +134,10 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> {
   public Collection<String> getAllowedEncodings() {
     return delegateCoder.getAllowedEncodings();
   }
+
+  @Override
+  public TypeDescriptor<T> getEncodedTypeDescriptor() {
+    return delegateCoder.getEncodedTypeDescriptor();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index e01dfd8..b651824 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets;
 import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
 import org.apache.beam.sdk.util.StreamUtils;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes {@link String Strings} in UTF-8 encoding.
@@ -48,6 +49,7 @@ public class StringUtf8Coder extends AtomicCoder<String> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
+  private static final TypeDescriptor<String> TYPE_DESCRIPTOR = new TypeDescriptor<String>() {};
 
   private static void writeString(String value, DataOutputStream dos)
       throws IOException {
@@ -113,6 +115,11 @@ public class StringUtf8Coder extends AtomicCoder<String> {
     return true;
   }
 
+  @Override
+  public TypeDescriptor<String> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
   /**
    * {@inheritDoc}
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
index a2562f2..5c0929c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TableRowJsonCoder.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
@@ -64,6 +65,7 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
       new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
 
   private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
+  private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
 
   private TableRowJsonCoder() { }
 
@@ -78,4 +80,9 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
     throw new NonDeterministicException(this,
         "TableCell can hold arbitrary instances, which may be non-deterministic.");
   }
+
+  @Override
+  public TypeDescriptor<TableRow> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 6258b21..1b79e90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
@@ -35,6 +36,8 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
 
   /////////////////////////////////////////////////////////////////////////////
 
+  private static final TypeDescriptor<Integer> TYPE_DESCRIPTOR = new TypeDescriptor<Integer>() {};
+
   protected TextualIntegerCoder() {}
 
   @Override
@@ -59,6 +62,11 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
   }
 
   @Override
+  public TypeDescriptor<Integer> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
   protected long getEncodedElementByteSize(Integer value, Context context) throws Exception {
     if (value == null) {
       throw new CoderException("cannot encode a null Integer");

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index baf3be8..ec9d8f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes {@link Integer Integers} using between 1 and 5 bytes. Negative
@@ -39,8 +40,8 @@ public class VarIntCoder extends AtomicCoder<Integer> {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  private static final VarIntCoder INSTANCE =
-      new VarIntCoder();
+  private static final VarIntCoder INSTANCE = new VarIntCoder();
+  private static final TypeDescriptor<Integer> TYPE_DESCRIPTOR = new TypeDescriptor<Integer>() {};
 
   private VarIntCoder() {}
 
@@ -86,6 +87,11 @@ public class VarIntCoder extends AtomicCoder<Integer> {
   }
 
   @Override
+  public TypeDescriptor<Integer> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
   protected long getEncodedElementByteSize(Integer value, Context context)
       throws Exception {
     if (value == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index ee3c501..3f1334d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UTFDataFormatException;
 import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} that encodes {@link Long Longs} using between 1 and 10 bytes. Negative
@@ -40,6 +41,7 @@ public class VarLongCoder extends AtomicCoder<Long> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final VarLongCoder INSTANCE = new VarLongCoder();
+  private static final TypeDescriptor<Long> TYPE_DESCRIPTOR = new TypeDescriptor<Long>() {};
 
   private VarLongCoder() {}
 
@@ -85,6 +87,11 @@ public class VarLongCoder extends AtomicCoder<Long> {
   }
 
   @Override
+  public TypeDescriptor<Long> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
   protected long getEncodedElementByteSize(Long value, Context context)
       throws Exception {
     if (value == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 6bd8a05..318485d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.coders;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import java.io.InputStream;
 import java.io.OutputStream;
+import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
  * A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
@@ -34,6 +35,7 @@ public class VoidCoder extends AtomicCoder<Void> {
   /////////////////////////////////////////////////////////////////////////////
 
   private static final VoidCoder INSTANCE = new VoidCoder();
+  private static final TypeDescriptor<Void> TYPE_DESCRIPTOR = new TypeDescriptor<Void>() {};
 
   private VoidCoder() {}
 
@@ -69,6 +71,11 @@ public class VoidCoder extends AtomicCoder<Void> {
   }
 
   @Override
+  public TypeDescriptor<Void> getEncodedTypeDescriptor() {
+    return TYPE_DESCRIPTOR;
+  }
+
+  @Override
   protected long getEncodedElementByteSize(Void value, Context context)
       throws Exception {
     return 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index da93cdc..392cad7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -46,6 +46,8 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeParameter;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;
@@ -364,5 +366,11 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
       DURATION_CODER.verifyDeterministic();
       INSTANT_CODER.verifyDeterministic();
     }
+
+    @Override
+    public TypeDescriptor<Event<T>> getEncodedTypeDescriptor() {
+      return new TypeDescriptor<Event<T>>() {}.where(
+          new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index 7f3bbd3..c0c3df3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -151,6 +151,12 @@ public class TimestampedValue<V> {
     public static <T> List<Object> getInstanceComponents(TimestampedValue<T> exampleValue) {
       return Arrays.<Object>asList(exampleValue.getValue());
     }
+
+    @Override
+    public TypeDescriptor<TimestampedValue<T>> getEncodedTypeDescriptor() {
+      return new TypeDescriptor<TimestampedValue<T>>() {}.where(
+          new TypeParameter<T>() {}, valueCoder.getEncodedTypeDescriptor());
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 60dc07a..2cd047b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -846,6 +847,12 @@ public class AvroCoderTest {
         reasonField(SomeGeneric.class, "foo", "erasure"));
   }
 
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
+    assertThat(coder.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(Pojo.class)));
+  }
+
   private static class SomeGeneric<T> {
     @SuppressWarnings("unused")
     private T foo;

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianIntegerCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianIntegerCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianIntegerCoderTest.java
index 091fda2..1946f5a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianIntegerCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianIntegerCoderTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -86,4 +90,9 @@ public class BigEndianIntegerCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(Integer.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianLongCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianLongCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianLongCoderTest.java
index 3a74e30..1654967 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianLongCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigEndianLongCoderTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -90,4 +94,9 @@ public class BigEndianLongCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(Long.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteArrayCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteArrayCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteArrayCoderTest.java
index f80a409..89487d7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteArrayCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteArrayCoderTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -139,4 +140,9 @@ public class ByteArrayCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(byte[].class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteCoderTest.java
index b7673e2..a4ce64c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteCoderTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -87,4 +91,9 @@ public class ByteCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(Byte.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteStringCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteStringCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteStringCoderTest.java
index 1d4c062..ace1527 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteStringCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ByteStringCoderTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
@@ -27,6 +29,7 @@ import java.util.List;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -117,4 +120,9 @@ public class ByteStringCoderTest {
       }
     }
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(ByteString.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 8c0e584..10177e7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -459,6 +459,7 @@ public class CoderRegistryTest {
   private static class MyValueCoder implements Coder<MyValue> {
 
     private static final MyValueCoder INSTANCE = new MyValueCoder();
+    private static final TypeDescriptor<MyValue> TYPE_DESCRIPTOR = TypeDescriptor.of(MyValue.class);
 
     public static MyValueCoder of() {
       return INSTANCE;
@@ -525,6 +526,11 @@ public class CoderRegistryTest {
     public Collection<String> getAllowedEncodings() {
       return Collections.singletonList(getEncodingId());
     }
+
+    @Override
+    public TypeDescriptor<MyValue> getEncodedTypeDescriptor() {
+      return TYPE_DESCRIPTOR;
+    }
   }
 
   private static class UnknownType { }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderTest.java
index ccbffdd..4550453 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.coders;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
@@ -26,6 +27,7 @@ import static org.junit.Assert.assertThat;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -74,4 +76,10 @@ public class CoderTest {
     assertThat(exception.toString(), containsString("Problem"));
     assertThat(exception.toString(), containsString("is not deterministic"));
   }
+
+  @Test
+  public void testTypeIsPreserved() throws Exception {
+    assertThat(VoidCoder.of().getEncodedTypeDescriptor(),
+        equalTo(TypeDescriptor.of(Void.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
index faa7e1a..d052413 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CollectionCoderTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,6 +29,7 @@ import java.util.TreeSet;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -95,4 +99,10 @@ public class CollectionCoderTest {
   public void testCoderIsSerializableWithWellKnownCoderType() throws Exception {
     CoderProperties.coderSerializable(CollectionCoder.of(GlobalWindow.Coder.INSTANCE));
   }
+
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<Collection<Integer>> expectedTypeDescriptor =
+        new TypeDescriptor<Collection<Integer>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(expectedTypeDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
index a8496c9..59749ae 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DefaultCoderTest.java
@@ -79,7 +79,7 @@ public class DefaultCoderTest {
     }
 
     protected CustomSerializableCoder() {
-      super(CustomRecord.class);
+      super(CustomRecord.class, TypeDescriptor.of(CustomRecord.class));
     }
   }
 
@@ -93,7 +93,7 @@ public class DefaultCoderTest {
     }
 
     protected OldCustomSerializableCoder() {
-      super(OldCustomRecord.class);
+      super(OldCustomRecord.class, TypeDescriptor.of(OldCustomRecord.class));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
index 9bb9d51..b95b76d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DelegateCoderTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -32,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -45,6 +48,9 @@ public class DelegateCoderTest implements Serializable {
       Collections.singleton(13),
       new HashSet<>(Arrays.asList(31, -5, 83)));
 
+  private static final TypeDescriptor<Set<Integer>> SET_INTEGER_TYPE_DESCRIPTOR =
+      new TypeDescriptor<Set<Integer>>() {};
+
   private static final DelegateCoder<Set<Integer>, List<Integer>> TEST_CODER = DelegateCoder.of(
       ListCoder.of(VarIntCoder.of()),
       new DelegateCoder.CodingFunction<Set<Integer>, List<Integer>>() {
@@ -78,7 +84,7 @@ public class DelegateCoderTest implements Serializable {
         public int hashCode() {
           return this.getClass().hashCode();
         }
-      });
+      }, SET_INTEGER_TYPE_DESCRIPTOR);
 
   @Test
   public void testDeterministic() throws Exception {
@@ -180,4 +186,31 @@ public class DelegateCoderTest implements Serializable {
     assertNotEquals(varIntCoder1, bigEndianIntegerCoder);
     assertNotEquals(varIntCoder1.hashCode(), bigEndianIntegerCoder.hashCode());
   }
+
+  @Test
+  public void testEncodedTypeDescriptorSimpleEncodedType() throws Exception {
+    assertThat(
+        DelegateCoder.of(
+            StringUtf8Coder.of(),
+            new DelegateCoder.CodingFunction<Integer, String>() {
+              @Override
+              public String apply(Integer input) {
+                return String.valueOf(input);
+              }
+            },
+            new DelegateCoder.CodingFunction<String, Integer>() {
+              @Override
+              public Integer apply(String input) {
+                return Integer.valueOf(input);
+              }
+            },
+            new TypeDescriptor<Integer>(){}).getEncodedTypeDescriptor(),
+        equalTo(TypeDescriptor.of(Integer.class)));
+  }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<Set<Integer>> typeDescriptor = new TypeDescriptor<Set<Integer>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DoubleCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DoubleCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DoubleCoderTest.java
index f43af30..6e21abd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DoubleCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DoubleCoderTest.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -92,4 +96,9 @@ public class DoubleCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(Double.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DurationCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DurationCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DurationCoderTest.java
index 4cb697e..52b7f40 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DurationCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/DurationCoderTest.java
@@ -17,11 +17,15 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import com.google.common.collect.Lists;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.ReadableDuration;
 import org.junit.Rule;
@@ -82,4 +86,10 @@ public class DurationCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(
+        TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(ReadableDuration.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/InstantCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/InstantCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/InstantCoderTest.java
index 16cb703..8b4bc8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/InstantCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/InstantCoderTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import com.google.common.collect.Lists;
 import com.google.common.primitives.UnsignedBytes;
 import java.util.ArrayList;
@@ -25,6 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -112,4 +116,9 @@ public class InstantCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(Instant.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index e200efe..2eba05b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
@@ -30,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Structs;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -121,4 +124,10 @@ public class IterableCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<Iterable<Integer>> typeDescriptor = new TypeDescriptor<Iterable<Integer>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
index c023278..55701bf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/JAXBCoderTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
@@ -32,6 +34,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -210,4 +213,11 @@ public class JAXBCoderTest {
     CoderProperties.coderHasEncodingId(
         coder, TestType.class.getName());
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(
+        JAXBCoder.of(TestType.class).getEncodedTypeDescriptor(),
+        equalTo(TypeDescriptor.of(TestType.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 1422897..14c9b06 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -17,7 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -138,4 +141,11 @@ public class KvCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<KV<String, Integer>> typeDescriptor =
+        new TypeDescriptor<KV<String, Integer>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
index 8d91343..87906e2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/ListCoderTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -27,6 +29,7 @@ import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -136,4 +139,9 @@ public class ListCoderTest {
     CoderProperties.<List<Integer>>coderDecodeEncodeEqual(coder, list);
   }
 
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<List<Integer>> typeDescriptor = new TypeDescriptor<List<Integer>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
index 67366c8..2983ea4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/MapCoderTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
@@ -30,6 +32,7 @@ import java.util.TreeMap;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -109,4 +112,11 @@ public class MapCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<Map<Integer, String>> typeDescriptor =
+        new TypeDescriptor<Map<Integer, String>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 63066a4..51e7bad 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -36,6 +36,7 @@ import java.util.List;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -171,6 +172,11 @@ public class NullableCoderTest {
     assertThat(NullableCoder.of(coder), theInstance(coder));
   }
 
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(TypeDescriptor.of(String.class)));
+  }
+
   private static class EntireStreamExpectingCoder extends DeterministicStandardCoder<String> {
     @Override
     public void encode(

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 296ddc9..f8c0001 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.coders;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.Serializer;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -232,4 +234,11 @@ public class SerializableCoderTest implements Serializable {
         coder,
         String.format("%s:%s", MyRecord.class.getName(), MyRecord.serialVersionUID));
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    assertThat(
+        SerializableCoder.of(MyRecord.class).getEncodedTypeDescriptor(),
+        Matchers.equalTo(TypeDescriptor.of(MyRecord.class)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b98fa08/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
index e7f8d1d..7515553 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SetCoderTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -25,6 +28,7 @@ import java.util.TreeSet;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -88,4 +92,10 @@ public class SetCoderTest {
 
     CoderUtils.encodeToBase64(TEST_CODER, null);
   }
+
+  @Test
+  public void testEncodedTypeDescriptor() throws Exception {
+    TypeDescriptor<Set<Integer>> typeDescriptor = new TypeDescriptor<Set<Integer>>() {};
+    assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
+  }
 }


Mime
View raw message