beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/6] beam git commit: [BEAM-2166] Use contextless encode/decode by default.
Date Tue, 09 May 2017 17:47:12 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 5763c384d -> acb3f6a9c


http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
index d120f72..5df2bcf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -34,12 +34,23 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage>
{
   }
 
   @Override
+  public void encode(PubsubMessage value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
     PAYLOAD_CODER.encode(value.getPayload(), outStream, context);
   }
 
   @Override
+  public PubsubMessage decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
     return new PubsubMessage(
         PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of());

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index e061edc..bcf7656 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -45,15 +45,26 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
     return new PubsubMessageWithAttributesCoder();
   }
 
+  @Override
+  public void encode(PubsubMessage value, OutputStream outStream)
+      throws IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
   public void encode(PubsubMessage value, OutputStream outStream, Context context)
       throws IOException {
-    PAYLOAD_CODER.encode(value.getPayload(), outStream, context.nested());
+    PAYLOAD_CODER.encode(value.getPayload(), outStream);
     ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream, context);
   }
 
   @Override
+  public PubsubMessage decode(InputStream inStream) throws IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
-    byte[] payload = PAYLOAD_CODER.decode(inStream, context.nested());
+    byte[] payload = PAYLOAD_CODER.decode(inStream);
     Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);
     return new PubsubMessage(payload, attributes);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 9f04a6c..ad38e28 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -108,21 +108,21 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
 
     @Override
     public void encode(
-        OutgoingMessage value, OutputStream outStream, Context context)
+        OutgoingMessage value, OutputStream outStream)
         throws CoderException, IOException {
-      ByteArrayCoder.of().encode(value.elementBytes, outStream, context.nested());
-      ATTRIBUTES_CODER.encode(value.attributes, outStream, context.nested());
-      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, context.nested());
-      RECORD_ID_CODER.encode(value.recordId, outStream, context.nested());
+      ByteArrayCoder.of().encode(value.elementBytes, outStream);
+      ATTRIBUTES_CODER.encode(value.attributes, outStream);
+      BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream);
+      RECORD_ID_CODER.encode(value.recordId, outStream);
     }
 
     @Override
     public OutgoingMessage decode(
-        InputStream inStream, Context context) throws CoderException, IOException {
-      byte[] elementBytes = ByteArrayCoder.of().decode(inStream, context.nested());
-      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context.nested());
-      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, context.nested());
-      @Nullable String recordId = RECORD_ID_CODER.decode(inStream, context.nested());
+        InputStream inStream) throws CoderException, IOException {
+      byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
+      Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
+      long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
+      @Nullable String recordId = RECORD_ID_CODER.decode(inStream);
       return new OutgoingMessage(elementBytes, attributes, timestampMsSinceEpoch, recordId);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index c16b8fb..e8fe701 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -369,19 +369,18 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
     private PubsubCheckpointCoder() {}
 
     @Override
-    public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
+    public void encode(PubsubCheckpoint value, OutputStream outStream)
         throws IOException {
       SUBSCRIPTION_PATH_CODER.encode(
           value.subscriptionPath,
-          outStream,
-          context.nested());
-      LIST_CODER.encode(value.notYetReadIds, outStream, context);
+          outStream);
+      LIST_CODER.encode(value.notYetReadIds, outStream);
     }
 
     @Override
-    public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException
{
-      String path = SUBSCRIPTION_PATH_CODER.decode(inStream, context.nested());
-      List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
+    public PubsubCheckpoint decode(InputStream inStream) throws IOException {
+      String path = SUBSCRIPTION_PATH_CODER.decode(inStream);
+      List<String> notYetReadIds = LIST_CODER.decode(inStream);
       return new PubsubCheckpoint(path, null, null, notYetReadIds);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d60c721..70d5377 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -748,12 +748,23 @@ public class BigQueryIOTest implements Serializable {
    */
   private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow>
{
     @Override
+    public void encode(PartitionedGlobalWindow window, OutputStream outStream)
+        throws IOException, CoderException {
+      encode(window, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
         throws IOException, CoderException {
       StringUtf8Coder.of().encode(window.value, outStream, context);
     }
 
     @Override
+    public PartitionedGlobalWindow decode(InputStream inStream) throws IOException, CoderException
{
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public PartitionedGlobalWindow decode(InputStream inStream, Context context)
         throws IOException, CoderException {
       return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 8fddfe0..8d2598a 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -68,13 +68,13 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T>
{
   }
 
   @Override
-  public void encode(T value, OutputStream outStream, Context context) throws IOException
{
+  public void encode(T value, OutputStream outStream) throws IOException {
     value.write(new DataOutputStream(outStream));
   }
 
   @SuppressWarnings("unchecked")
   @Override
-  public T decode(InputStream inStream, Context context) throws IOException {
+  public T decode(InputStream inStream) throws IOException {
     try {
       if (type == NullWritable.class) {
         // NullWritable has no default constructor

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 7cc043c..501fe09 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -44,16 +44,14 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements
Serializable {
   }
 
   @Override
-  public void encode(Mutation mutation, OutputStream outStream,
-                     Coder.Context context) throws IOException {
+  public void encode(Mutation mutation, OutputStream outStream) throws IOException {
     MutationType type = getType(mutation);
     MutationProto proto = ProtobufUtil.toMutation(type, mutation);
     proto.writeDelimitedTo(outStream);
   }
 
   @Override
-  public Mutation decode(InputStream inStream,
-                         Coder.Context context) throws IOException {
+  public Mutation decode(InputStream inStream) throws IOException {
     return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 24a5f7f..1d06635 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -41,13 +41,13 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable
{
   }
 
   @Override
-  public void encode(Result value, OutputStream outputStream, Coder.Context context)
+  public void encode(Result value, OutputStream outputStream)
           throws IOException {
     ProtobufUtil.toResult(value).writeDelimitedTo(outputStream);
   }
 
   @Override
-  public Result decode(InputStream inputStream, Coder.Context context)
+  public Result decode(InputStream inputStream)
       throws IOException {
     return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index ba84c2a..e21945f 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1597,13 +1597,13 @@ public class KafkaIO {
 
   private static class NullOnlyCoder<T> extends AtomicCoder<T> {
     @Override
-    public void encode(T value, OutputStream outStream, Context context) {
+    public void encode(T value, OutputStream outStream) {
       checkArgument(value == null, "Can only encode nulls");
       // Encode as no bytes.
     }
 
     @Override
-    public T decode(InputStream inStream, Context context) {
+    public T decode(InputStream inStream) {
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index d838a0d..1971060 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -50,6 +50,12 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K,
V>> {
   }
 
   @Override
+  public void encode(KafkaRecord<K, V> value, OutputStream outStream)
+      throws CoderException, IOException {
+    encode(value, outStream, Context.NESTED);
+  }
+
+  @Override
   public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
                          throws CoderException, IOException {
     Context nested = context.nested();
@@ -61,6 +67,11 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K,
V>> {
   }
 
   @Override
+  public KafkaRecord<K, V> decode(InputStream inStream) throws CoderException, IOException
{
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public KafkaRecord<K, V> decode(InputStream inStream, Context context)
                                       throws CoderException, IOException {
     Context nested = context.nested();

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 77fe127..f233e27 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -43,30 +43,28 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
     }
 
     @Override
-    public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
+    public void encode(KinesisRecord value, OutputStream outStream) throws
             IOException {
-        Context nested = context.nested();
-        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested);
-        STRING_CODER.encode(value.getSequenceNumber(), outStream, nested);
-        STRING_CODER.encode(value.getPartitionKey(), outStream, nested);
-        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested);
-        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
-        INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
-        STRING_CODER.encode(value.getStreamName(), outStream, nested);
-        STRING_CODER.encode(value.getShardId(), outStream, context);
+        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+        STRING_CODER.encode(value.getSequenceNumber(), outStream);
+        STRING_CODER.encode(value.getPartitionKey(), outStream);
+        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+        INSTANT_CODER.encode(value.getReadTime(), outStream);
+        STRING_CODER.encode(value.getStreamName(), outStream);
+        STRING_CODER.encode(value.getShardId(), outStream);
     }
 
     @Override
-    public KinesisRecord decode(InputStream inStream, Context context) throws IOException
{
-        Context nested = context.nested();
-        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested));
-        String sequenceNumber = STRING_CODER.decode(inStream, nested);
-        String partitionKey = STRING_CODER.decode(inStream, nested);
-        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested);
-        long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
-        Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
-        String streamName = STRING_CODER.decode(inStream, nested);
-        String shardId = STRING_CODER.decode(inStream, context);
+    public KinesisRecord decode(InputStream inStream) throws IOException {
+        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+        String sequenceNumber = STRING_CODER.decode(inStream);
+        String partitionKey = STRING_CODER.decode(inStream);
+        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+        long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+        Instant readTimestamp = INSTANT_CODER.decode(inStream);
+        String streamName = STRING_CODER.decode(inStream);
+        String shardId = STRING_CODER.decode(inStream);
         return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
                 approximateArrivalTimestamp, readTimestamp, streamName, shardId
         );

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 5b2ec02..d4c0440 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -88,15 +88,8 @@ public class JAXBCoder<T> extends CustomCoder<T> {
   }
 
   @Override
-  public void encode(T value, OutputStream outStream) throws IOException {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      jaxbMarshaller.get().marshal(value, baos);
-    } catch (JAXBException e) {
-      throw new CoderException(e);
-    }
-    VarInt.encode(baos.size(), outStream);
-    baos.writeTo(outStream);
+  public void encode(T value, OutputStream outStream) throws CoderException, IOException
{
+    encode(value, outStream, Context.NESTED);
   }
 
   @Override
@@ -109,11 +102,23 @@ public class JAXBCoder<T> extends CustomCoder<T> {
         throw new CoderException(e);
       }
     } else {
-      encode(value, outStream);
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      try {
+        jaxbMarshaller.get().marshal(value, baos);
+      } catch (JAXBException e) {
+        throw new CoderException(e);
+      }
+      VarInt.encode(baos.size(), outStream);
+      baos.writeTo(outStream);
     }
   }
 
   @Override
+  public T decode(InputStream inStream) throws CoderException, IOException {
+    return decode(inStream, Context.NESTED);
+  }
+
+  @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException
{
     try {
       if (!context.isWholeStream) {

http://git-wip-us.apache.org/repos/asf/beam/blob/09d1affd/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 2b4503a..c175e4a 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -178,20 +178,29 @@ public class JAXBCoderTest {
     }
 
     @Override
+    public void encode(TestType value, OutputStream outStream)
+        throws CoderException, IOException {
+      encode(value, outStream, Context.NESTED);
+    }
+
+    @Override
     public void encode(TestType value, OutputStream outStream, Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      VarIntCoder.of().encode(3, outStream, nestedContext);
-      jaxbCoder.encode(value, outStream, nestedContext);
+      VarIntCoder.of().encode(3, outStream);
+      jaxbCoder.encode(value, outStream);
       VarLongCoder.of().encode(22L, outStream, context);
     }
 
     @Override
+    public TestType decode(InputStream inStream) throws CoderException, IOException {
+      return decode(inStream, Context.NESTED);
+    }
+
+    @Override
     public TestType decode(InputStream inStream, Context context)
         throws CoderException, IOException {
-      Context nestedContext = context.nested();
-      VarIntCoder.of().decode(inStream, nestedContext);
-      TestType result = jaxbCoder.decode(inStream, nestedContext);
+      VarIntCoder.of().decode(inStream);
+      TestType result = jaxbCoder.decode(inStream);
       VarLongCoder.of().decode(inStream, context);
       return result;
     }


Mime
View raw message