beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Fix JAXBCoder in the nested context
Date Wed, 06 Apr 2016 22:39:11 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 064573cba -> cdfe50932


Fix JAXBCoder in the nested context

JAXBCoder in the nested context is broken with any coder that writes
content after the output of the XML stream, as decode will read the
entire remaining stream and fail.

In the nested context, prepend a long representing the size of the XML
while encoding, and limit the size of the returned stream while
decoding.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe7d543
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe7d543
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe7d543

Branch: refs/heads/master
Commit: 2fe7d5438aafd35414ca21090acefde007c8df8a
Parents: 064573c
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Apr 4 13:54:05 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Apr 6 15:38:55 2016 -0700

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/coders/JAXBCoder.java    | 56 +++++++++++++++----
 .../dataflow/sdk/coders/JAXBCoderTest.java      | 58 +++++++++++++++++++-
 2 files changed, 101 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe7d543/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
index f683b3e..6e2833e 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/coders/JAXBCoder.java
@@ -19,6 +19,8 @@ package com.google.cloud.dataflow.sdk.coders;
 
 import com.google.cloud.dataflow.sdk.util.CloudObject;
 import com.google.cloud.dataflow.sdk.util.Structs;
+import com.google.cloud.dataflow.sdk.util.VarInt;
+import com.google.common.io.ByteStreams;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -71,13 +73,19 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
         JAXBContext jaxbContext = JAXBContext.newInstance(jaxbClass);
         jaxbMarshaller = jaxbContext.createMarshaller();
       }
-
-      jaxbMarshaller.marshal(value, new FilterOutputStream(outStream) {
-        // JAXB closes the underyling stream so we must filter out those calls.
-        @Override
-        public void close() throws IOException {
+      if (!context.isWholeStream) {
+        try {
+          long size = getEncodedElementByteSize(value, Context.OUTER);
+          // record the number of bytes the XML consists of so when reading we only read
the encoded
+          // value
+          VarInt.encode(size, outStream);
+        } catch (Exception e) {
+          throw new CoderException(
+              "An Exception occured while trying to get the size of an encoded representation",
e);
         }
-      });
+      }
+
+      jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
     } catch (JAXBException e) {
       throw new CoderException(e);
     }
@@ -91,13 +99,13 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
         jaxbUnmarshaller = jaxbContext.createUnmarshaller();
       }
 
+      InputStream stream = inStream;
+      if (!context.isWholeStream) {
+        long limit = VarInt.decodeLong(inStream);
+        stream = ByteStreams.limit(inStream, limit);
+      }
       @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.unmarshal(new FilterInputStream(inStream) {
-        // JAXB closes the underyling stream so we must filter out those calls.
-        @Override
-        public void close() throws IOException {
-        }
-      });
+      T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream));
       return obj;
     } catch (JAXBException e) {
       throw new CoderException(e);
@@ -109,6 +117,30 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
     return getJAXBClass().getName();
   }
 
+  private static class CloseIgnoringInputStream extends FilterInputStream {
+
+    protected CloseIgnoringInputStream(InputStream in) {
+      super(in);
+    }
+
+    @Override
+    public void close() {
+      // Do nothing. JAXB closes the underlying stream so we must filter out those calls.
+    }
+  }
+
+  private static class CloseIgnoringOutputStream extends FilterOutputStream {
+
+    protected CloseIgnoringOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // JAXB closes the underlying stream so we must filter out those calls.
+    }
+  }
+
   ////////////////////////////////////////////////////////////////////////////////////
   // JSON Serialization details below
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe7d543/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java
index ae09190..26c1198 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/coders/JAXBCoderTest.java
@@ -19,11 +19,18 @@ package com.google.cloud.dataflow.sdk.coders;
 
 import com.google.cloud.dataflow.sdk.testing.CoderProperties;
 import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.common.collect.ImmutableList;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
 import javax.xml.bind.annotation.XmlRootElement;
 
 /** Unit tests for {@link JAXBCoder}. */
@@ -79,7 +86,7 @@ public class JAXBCoderTest {
   }
 
   @Test
-  public void testEncodeDecode() throws Exception {
+  public void testEncodeDecodeOuter() throws Exception {
     JAXBCoder<TestType> coder = JAXBCoder.of(TestType.class);
 
     byte[] encoded = CoderUtils.encodeToByteArray(coder, new TestType("abc", 9999));
@@ -87,6 +94,55 @@ public class JAXBCoderTest {
   }
 
   @Test
+  public void testEncodeDecodeNested() throws Exception {
+    JAXBCoder<TestType> jaxbCoder = JAXBCoder.of(TestType.class);
+    TestCoder nesting = new TestCoder(jaxbCoder);
+
+    byte[] encoded = CoderUtils.encodeToByteArray(nesting, new TestType("abc", 9999));
+    Assert.assertEquals(
+        new TestType("abc", 9999), CoderUtils.decodeFromByteArray(nesting, encoded));
+  }
+
+  /**
+   * A coder that surrounds the value with two values, to demonstrate nesting.
+   */
+  private static class TestCoder extends StandardCoder<TestType> {
+    private final JAXBCoder<TestType> jaxbCoder;
+    public TestCoder(JAXBCoder<TestType> jaxbCoder) {
+      this.jaxbCoder = jaxbCoder;
+    }
+
+    @Override
+    public void encode(TestType value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      Context subContext = context.nested();
+      VarIntCoder.of().encode(3, outStream, subContext);
+      jaxbCoder.encode(value, outStream, subContext);
+      VarLongCoder.of().encode(22L, outStream, subContext);
+    }
+
+    @Override
+    public TestType decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      Context subContext = context.nested();
+      VarIntCoder.of().decode(inStream, subContext);
+      TestType result = jaxbCoder.decode(inStream, subContext);
+      VarLongCoder.of().decode(inStream, subContext);
+      return result;
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return ImmutableList.of(jaxbCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      jaxbCoder.verifyDeterministic();
+    }
+  }
+
+  @Test
   public void testEncodable() throws Exception {
     CoderProperties.coderSerializable(JAXBCoder.of(TestType.class));
   }


Mime
View raw message