beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [04/13] beam git commit: automated context removal or redirection
Date Tue, 09 May 2017 04:21:24 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
index dfd4ea2..13a7261 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
@@ -48,13 +48,13 @@ public class CustomCoderTest {
     }
 
     @Override
-    public void encode(KV<String, Long> kv, OutputStream out, Context context)
+    public void encode(KV<String, Long> kv, OutputStream out)
             throws IOException {
       new DataOutputStream(out).writeLong(kv.getValue());
     }
 
     @Override
-    public KV<String, Long> decode(InputStream inStream, Context context)
+    public KV<String, Long> decode(InputStream inStream)
         throws IOException {
       return KV.of(key, new DataInputStream(inStream).readLong());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index d6d7de8..9fb0b82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -167,6 +167,12 @@ public class NullableCoderTest {
 
   private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
     @Override
+    public void encode(String value, OutputStream outStream)
+        throws IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(
         String value, OutputStream outStream, Context context) throws IOException {
       checkArgument(context.isWholeStream, "Expected to get entire stream");
@@ -174,6 +180,11 @@ public class NullableCoderTest {
     }
 
     @Override
+    public String decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public String decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       checkArgument(context.isWholeStream, "Expected to get entire stream");

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
index af2c94e..7aa2080 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
@@ -47,7 +47,7 @@ public class StructuredCoderTest {
     private static final long serialVersionUID = 0L;
 
     @Override
-    public void encode(@Nullable Boolean value, OutputStream outStream, Context context)
+    public void encode(@Nullable Boolean value, OutputStream outStream)
         throws CoderException, IOException {
       if (value == null) {
         outStream.write(2);
@@ -61,7 +61,7 @@ public class StructuredCoderTest {
     @Override
     @Nullable
     public Boolean decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       int value = inStream.read();
       if (value == 0) {
@@ -110,7 +110,7 @@ public class StructuredCoderTest {
 
     @Override
     public void encode(
-        @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context)
+        @Nullable ObjectIdentityBoolean value, OutputStream outStream)
         throws CoderException, IOException {
       if (value == null) {
         outStream.write(2);
@@ -124,7 +124,7 @@ public class StructuredCoderTest {
     @Override
     @Nullable
     public ObjectIdentityBoolean decode(
-        InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
       int value = inStream.read();
       if (value == 0) {
@@ -213,13 +213,13 @@ public class StructuredCoderTest {
   private static class Foo<T> extends StructuredCoder<T> {
 
     @Override
-    public void encode(T value, OutputStream outStream, Coder.Context context)
+    public void encode(T value, OutputStream outStream)
         throws CoderException, IOException {
       throw new UnsupportedOperationException();
     }
 
     @Override
-    public T decode(InputStream inStream, Coder.Context context)
+    public T decode(InputStream inStream)
         throws CoderException, IOException {
       throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 83f348c..37db4ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -98,12 +98,12 @@ public class PAssertTest implements Serializable {
     }
 
     @Override
-    public void encode(NotSerializableObject value, OutputStream outStream, Context context)
+    public void encode(NotSerializableObject value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public NotSerializableObject decode(InputStream inStream, Context context)
+    public NotSerializableObject decode(InputStream inStream)
         throws CoderException, IOException {
       return new NotSerializableObject();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index db5ff2e..375be33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -153,11 +153,11 @@ public class SerializableMatchersTest implements Serializable {
 
   private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
     @Override
-    public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
+    public void encode(NotSerializableClass value, OutputStream outStream) {
     }
 
     @Override
-    public NotSerializableClass decode(InputStream inStream, Coder.Context context) {
+    public NotSerializableClass decode(InputStream inStream) {
       return new NotSerializableClass();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 546683b..3939800 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -75,14 +75,14 @@ public class WindowSupplierTest {
   private static class FailingCoder extends AtomicCoder<BoundedWindow> {
     @Override
     public void encode(
-        BoundedWindow value, OutputStream outStream, Context context)
+        BoundedWindow value, OutputStream outStream)
         throws CoderException, IOException {
       throw new CoderException("Test Encode Exception");
     }
 
     @Override
     public BoundedWindow decode(
-        InputStream inStream, Context context) throws CoderException, IOException {
+        InputStream inStream) throws CoderException, IOException {
       throw new CoderException("Test Decode Exception");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 8a4d563..33c652a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -336,12 +336,23 @@ public class  CombineFnsTest {
     private static final UserStringCoder INSTANCE = new UserStringCoder();
 
     @Override
+    public void encode(UserString value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(UserString value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       StringUtf8Coder.of().encode(value.strValue, outStream, context);
     }
 
     @Override
+    public UserString decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public UserString decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       return UserString.of(StringUtf8Coder.of().decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index e4b016b..bd8aee4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -876,6 +876,12 @@ public class CombineTest implements Serializable {
      */
     private class CountSumCoder extends AtomicCoder<CountSum> {
       @Override
+      public void encode(CountSum value, OutputStream outStream, OutputStream outStream)
+          throws CoderException, IOException {
+        encode(outStream, outStream, Context.NESTED);
+      }
+
+      @Override
       public void encode(CountSum value, OutputStream outStream,
           Context context) throws CoderException, IOException {
         LONG_CODER.encode(value.count, outStream);
@@ -883,6 +889,11 @@ public class CombineTest implements Serializable {
       }
 
       @Override
+      public CountSum decode(InputStream inStream) throws CoderException, IOException {
+        return decode(inStream, Coder.Context.NESTED);
+      }
+
+      @Override
       public CountSum decode(InputStream inStream, Coder.Context context)
           throws CoderException, IOException {
         long count = LONG_CODER.decode(inStream);
@@ -925,12 +936,23 @@ public class CombineTest implements Serializable {
       public static Coder<Accumulator> getCoder() {
         return new AtomicCoder<Accumulator>() {
           @Override
+          public void encode(Accumulator accumulator, OutputStream outStream)
+              throws CoderException, IOException {
+            encode(accumulator, outStream, Coder.Context.NESTED);
+          }
+
+          @Override
           public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
               throws CoderException, IOException {
             StringUtf8Coder.of().encode(accumulator.value, outStream, context);
           }
 
           @Override
+          public Accumulator decode(InputStream inStream) throws CoderException, IOException {
+            return decode(inStream, Coder.Context.NESTED);
+          }
+
+          @Override
           public Accumulator decode(InputStream inStream, Coder.Context context)
               throws CoderException, IOException {
             return new Accumulator(StringUtf8Coder.of().decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 7e8a1dd..a05d31c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -134,11 +134,11 @@ public class CreateTest {
 
   private static class RecordCoder extends AtomicCoder<Record> {
     @Override
-    public void encode(Record value, OutputStream outStream, Context context)
+    public void encode(Record value, OutputStream outStream)
         throws CoderException, IOException {}
 
     @Override
-    public Record decode(InputStream inStream, Context context) throws CoderException, IOException {
+    public Record decode(InputStream inStream) throws CoderException, IOException {
       return null;
     }
   }
@@ -207,15 +207,14 @@ public class CreateTest {
       @Override
       public void encode(
           UnserializableRecord value,
-          OutputStream outStream,
-          org.apache.beam.sdk.coders.Coder.Context context)
+          OutputStream outStream)
           throws CoderException, IOException {
         stringCoder.encode(value.myString, outStream);
       }
 
       @Override
       public UnserializableRecord decode(
-          InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+          InputStream inStream)
           throws CoderException, IOException {
         return new UnserializableRecord(stringCoder.decode(inStream));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index aba33eb..0cd885c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -469,13 +469,13 @@ public class GroupByKeyTest {
     private DeterministicKeyCoder() {}
 
     @Override
-    public void encode(BadEqualityKey value, OutputStream outStream, Context context)
+    public void encode(BadEqualityKey value, OutputStream outStream)
         throws IOException {
       new DataOutputStream(outStream).writeLong(value.key);
     }
 
     @Override
-    public BadEqualityKey decode(InputStream inStream, Context context)
+    public BadEqualityKey decode(InputStream inStream)
         throws IOException {
       return new BadEqualityKey(new DataInputStream(inStream).readLong());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d2cb980..3697211 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -986,12 +986,12 @@ public class ParDoTest implements Serializable {
     }
 
     @Override
-    public void encode(TestDummy value, OutputStream outStream, Context context)
+    public void encode(TestDummy value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public TestDummy decode(InputStream inStream, Context context)
+    public TestDummy decode(InputStream inStream)
         throws CoderException, IOException {
       return new TestDummy();
     }
@@ -1090,12 +1090,23 @@ public class ParDoTest implements Serializable {
     }
 
     @Override
+    public void encode(MyInteger value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(MyInteger value, OutputStream outStream, Context context)
         throws CoderException, IOException {
       delegate.encode(value.getValue(), outStream, context);
     }
 
     @Override
+    public MyInteger decode(InputStream inStream) throws CoderException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public MyInteger decode(InputStream inStream, Context context) throws CoderException,
         IOException {
       return new MyInteger(delegate.decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 84f3d69..cdd03d9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -507,12 +507,23 @@ public class ViewTest implements Serializable {
 
   private static class NonDeterministicStringCoder extends AtomicCoder<String> {
     @Override
+    public void encode(String value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public void encode(String value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
       StringUtf8Coder.of().encode(value, outStream, context);
     }
 
     @Override
+    public String decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Coder.Context.NESTED);
+    }
+
+    @Override
     public String decode(InputStream inStream, Coder.Context context)
         throws CoderException, IOException {
       return StringUtf8Coder.of().decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 489493a..a8cd35e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -277,10 +277,10 @@ public class DoFnInvokersTest {
     }
 
     @Override
-    public void encode(SomeRestriction value, OutputStream outStream, Context context) {}
+    public void encode(SomeRestriction value, OutputStream outStream) {}
 
     @Override
-    public SomeRestriction decode(InputStream inStream, Context context) {
+    public SomeRestriction decode(InputStream inStream) {
       return null;
     }
   }
@@ -400,10 +400,10 @@ public class DoFnInvokersTest {
 
     @Override
     public void encode(
-        RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {}
+        RestrictionWithDefaultTracker value, OutputStream outStream) {}
 
     @Override
-    public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) {
+    public RestrictionWithDefaultTracker decode(InputStream inStream) {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 7230a8b..f36e5e1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -50,12 +50,12 @@ public class CoderUtilsTest {
     }
 
     @Override
-    public void encode(Integer value, OutputStream outStream, Context context) {
+    public void encode(Integer value, OutputStream outStream) {
       throw new RuntimeException("not expecting to be called");
     }
 
     @Override
-    public Integer decode(InputStream inStream, Context context) {
+    public Integer decode(InputStream inStream) {
       throw new RuntimeException("not expecting to be called");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 6ba1d4a..9a80730 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -89,12 +89,12 @@ public class SerializableUtilsTest {
     private final Object unserializableField = new Object();
 
     @Override
-    public void encode(Object value, OutputStream outStream, Context context)
+    public void encode(Object value, OutputStream outStream)
         throws CoderException, IOException {
     }
 
     @Override
-    public Object decode(InputStream inStream, Context context)
+    public Object decode(InputStream inStream)
         throws CoderException, IOException {
       return unserializableField;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index 325c69d..73c7977 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -49,6 +49,12 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   private ByteStringCoder() {}
 
   @Override
+  public void encode(ByteString value, OutputStream outStream)
+      throws IOException, CoderException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(ByteString value, OutputStream outStream, Context context)
       throws IOException, CoderException {
     if (value == null) {
@@ -63,6 +69,11 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
   }
 
   @Override
+  public ByteString decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public ByteString decode(InputStream inStream, Context context) throws IOException {
     if (context.isWholeStream) {
       return ByteString.readFrom(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 968a2fa..f73bf2b 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -168,6 +168,12 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
   }
 
   @Override
+  public void encode(T value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(T value, OutputStream outStream, Context context) throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
@@ -180,6 +186,11 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
   }
 
   @Override
+  public T decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws IOException {
     if (context.isWholeStream) {
       return getParser().parseFrom(inStream, getExtensionRegistry());

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 33b9f77..f034a03 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -38,7 +38,7 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
   }
 
   @Override
-  public void encode(TableDestination value, OutputStream outStream, Context context)
+  public void encode(TableDestination value, OutputStream outStream)
       throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null value");

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 8ae75c5..c4707da 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -38,6 +38,12 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
   }
 
   @Override
+  public void encode(TableRowInfo value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(TableRowInfo value, OutputStream outStream, Context context)
       throws IOException {
     if (value == null) {
@@ -48,6 +54,11 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
   }
 
   @Override
+  public TableRowInfo decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public TableRowInfo decode(InputStream inStream, Context context)
       throws IOException {
     return new TableRowInfo(

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index cfec991..e4b6f1f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -38,6 +38,12 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   }
 
   @Override
+  public void encode(TableRow value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(TableRow value, OutputStream outStream, Context context)
       throws IOException {
     String strValue = MAPPER.writeValueAsString(value);
@@ -45,6 +51,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   }
 
   @Override
+  public TableRow decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public TableRow decode(InputStream inStream, Context context)
       throws IOException {
     String strValue = StringUtf8Coder.of().decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 9e83271..f014039 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -101,7 +101,7 @@ class WriteBundlesToFiles<DestinationT>
     }
 
     @Override
-    public void encode(Result<DestinationT> value, OutputStream outStream, Context context)
+    public void encode(Result<DestinationT> value, OutputStream outStream)
         throws IOException {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
@@ -112,7 +112,7 @@ class WriteBundlesToFiles<DestinationT>
     }
 
     @Override
-    public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException {
+    public Result<DestinationT> decode(InputStream inStream) throws IOException {
       String filename = stringCoder.decode(inStream);
       long fileByteSize = longCoder.decode(inStream);
       DestinationT destination = destinationCoder.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
index d120f72..5df2bcf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -34,12 +34,23 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
   }
 
   @Override
+  public void encode(PubsubMessage value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
     PAYLOAD_CODER.encode(value.getPayload(), outStream, context);
   }
 
   @Override
+  public PubsubMessage decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
     return new PubsubMessage(
         PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of());

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index 5907c9e..bcf7656 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -45,6 +45,12 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
     return new PubsubMessageWithAttributesCoder();
   }
 
+  @Override
+  public void encode(PubsubMessage value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
   public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
     PAYLOAD_CODER.encode(value.getPayload(), outStream);
@@ -52,6 +58,11 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
   }
 
   @Override
+  public PubsubMessage decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
     byte[] payload = PAYLOAD_CODER.decode(inStream);
     Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index ae320c7..ad38e28 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -108,7 +108,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
 
     @Override
     public void encode(
-        OutgoingMessage value, OutputStream outStream, Context context)
+        OutgoingMessage value, OutputStream outStream)
         throws CoderException, IOException {
       ByteArrayCoder.of().encode(value.elementBytes, outStream);
       ATTRIBUTES_CODER.encode(value.attributes, outStream);
@@ -118,7 +118,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
 
     @Override
     public OutgoingMessage decode(
-        InputStream inStream, Context context) throws CoderException, IOException {
+        InputStream inStream) throws CoderException, IOException {
       byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
       Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
       long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index e53976e..db8c1b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -369,6 +369,12 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     private PubsubCheckpointCoder() {}
 
     @Override
+    public void encode(PubsubCheckpoint value, OutputStream outStream)
+        throws IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
         throws IOException {
       SUBSCRIPTION_PATH_CODER.encode(
@@ -379,6 +385,11 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     }
 
     @Override
+    public PubsubCheckpoint decode(InputStream inStream) throws IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException {
       String path = SUBSCRIPTION_PATH_CODER.decode(inStream);
       List<String> notYetReadIds = LIST_CODER.decode(inStream, context);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d60c721..70d5377 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -748,12 +748,23 @@ public class BigQueryIOTest implements Serializable {
    */
   private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
     @Override
+    public void encode(PartitionedGlobalWindow window, OutputStream outStream)
+        throws IOException, CoderException {
+      encode(window, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {
       StringUtf8Coder.of().encode(window.value, outStream, context);
     }
 
     @Override
+    public PartitionedGlobalWindow decode(InputStream inStream) throws IOException, CoderException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public PartitionedGlobalWindow decode(InputStream inStream, Context context)
         throws IOException, CoderException {
       return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 8fddfe0..8d2598a 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -68,13 +68,13 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException {
+  public void encode(T value, OutputStream outStream) throws IOException {
     value.write(new DataOutputStream(outStream));
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
+  public T decode(InputStream inStream) throws IOException {
     try {
       if (type == NullWritable.class) {
         // NullWritable has no default constructor

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 7cc043c..501fe09 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -44,16 +44,14 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
   }
 
   @Override
-  public void encode(Mutation mutation, OutputStream outStream,
-                     Coder.Context context) throws IOException {
+  public void encode(Mutation mutation, OutputStream outStream) throws IOException {
     MutationType type = getType(mutation);
     MutationProto proto = ProtobufUtil.toMutation(type, mutation);
     proto.writeDelimitedTo(outStream);
   }
 
   @Override
-  public Mutation decode(InputStream inStream,
-                         Coder.Context context) throws IOException {
+  public Mutation decode(InputStream inStream) throws IOException {
     return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 24a5f7f..1d06635 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -41,13 +41,13 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
   }
 
   @Override
-  public void encode(Result value, OutputStream outputStream, Coder.Context context)
+  public void encode(Result value, OutputStream outputStream)
           throws IOException {
     ProtobufUtil.toResult(value).writeDelimitedTo(outputStream);
   }
 
   @Override
-  public Result decode(InputStream inputStream, Coder.Context context)
+  public Result decode(InputStream inputStream)
       throws IOException {
     return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index ba84c2a..e21945f 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1597,13 +1597,13 @@ public class KafkaIO {
 
   private static class NullOnlyCoder<T> extends AtomicCoder<T> {
     @Override
-    public void encode(T value, OutputStream outStream, Context context) {
+    public void encode(T value, OutputStream outStream) {
       checkArgument(value == null, "Can only encode nulls");
       // Encode as no bytes.
     }
 
     @Override
-    public T decode(InputStream inStream, Context context) {
+    public T decode(InputStream inStream) {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index d838a0d..1971060 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -50,6 +50,12 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
   }
 
   @Override
+  public void encode(KafkaRecord<K, V> value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
                          throws CoderException, IOException {
     Context nested = context.nested();
@@ -61,6 +67,11 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
   }
 
   @Override
+  public KafkaRecord<K, V> decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public KafkaRecord<K, V> decode(InputStream inStream, Context context)
                                       throws CoderException, IOException {
     Context nested = context.nested();

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index c6a0174..f233e27 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -43,7 +43,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
     }
 
     @Override
-    public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
+    public void encode(KinesisRecord value, OutputStream outStream) throws
             IOException {
         BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
         STRING_CODER.encode(value.getSequenceNumber(), outStream);
@@ -56,7 +56,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
     }
 
     @Override
-    public KinesisRecord decode(InputStream inStream, Context context) throws IOException {
+    public KinesisRecord decode(InputStream inStream) throws IOException {
         ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
         String sequenceNumber = STRING_CODER.decode(inStream);
         String partitionKey = STRING_CODER.decode(inStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 5b2ec02..d4c0440 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -88,15 +88,8 @@ public class JAXBCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      jaxbMarshaller.get().marshal(value, baos);
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-    VarInt.encode(baos.size(), outStream);
-    baos.writeTo(outStream);
+  public void encode(T value, OutputStream outStream) throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
   }
 
   @Override
@@ -109,11 +102,23 @@ public class JAXBCoder<T> extends CustomCoder<T> {
         throw new CoderException(e);
       }
     } else {
-      encode(value, outStream);
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try {
+        jaxbMarshaller.get().marshal(value, baos);
+      } catch (JAXBException e) {
+        throw new CoderException(e);
+      }
+      VarInt.encode(baos.size(), outStream);
+      baos.writeTo(outStream);
     }
   }
 
   @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException {
     try {
       if (!context.isWholeStream) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 5386a61..c175e4a 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -178,18 +178,27 @@ public class JAXBCoderTest {
     }
 
     @Override
+    public void encode(TestType value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(TestType value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
       VarIntCoder.of().encode(3, outStream);
       jaxbCoder.encode(value, outStream);
       VarLongCoder.of().encode(22L, outStream, context);
     }
 
     @Override
+    public TestType decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public TestType decode(InputStream inStream, Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
       VarIntCoder.of().decode(inStream);
       TestType result = jaxbCoder.decode(inStream);
       VarLongCoder.of().decode(inStream, context);


Mime
View raw message