beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/3] beam git commit: Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder
Date Mon, 23 Jan 2017 21:02:10 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9db5f746a -> daed01a69


Use a ThreadLocal to for Marshaller/Unmarshaller in JAXBCoder

This allows reuse of thread-unsafe marshallers and unmarshallers while
encoding elements, while the coder remains thread-safe.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cf0b990b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cf0b990b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cf0b990b

Branch: refs/heads/master
Commit: cf0b990b0336f46b4d4775c93e86bd2310d622b5
Parents: e1ee05e
Author: Kai Jiang <jiangkai@gmail.com>
Authored: Thu Jan 19 05:06:13 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Jan 23 11:46:48 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/coders/JAXBCoder.java   | 36 +++++++++++++++-----
 1 file changed, 27 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cf0b990b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
index 0a4f9cc..ea636fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/JAXBCoder.java
@@ -30,6 +30,7 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
 import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -45,6 +46,8 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   private final Class<T> jaxbClass;
   private final TypeDescriptor<T> typeDescriptor;
   private transient volatile JAXBContext jaxbContext;
+  private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
+  private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller;
 
   public Class<T> getJAXBClass() {
     return jaxbClass;
@@ -53,6 +56,28 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   private JAXBCoder(Class<T> jaxbClass) {
     this.jaxbClass = jaxbClass;
     this.typeDescriptor = TypeDescriptor.of(jaxbClass);
+    this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
+      @Override
+      protected Marshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createMarshaller();
+        } catch (JAXBException e) {
+          throw new RuntimeException("Error when creating marshaller from JAXB Context.",
e);
+        }
+      }
+    };
+    this.jaxbUnmarshaller = new EmptyOnDeserializationThreadLocal<Unmarshaller>() {
+      @Override
+      protected Unmarshaller initialValue() {
+        try {
+          JAXBContext jaxbContext = getContext();
+          return jaxbContext.createUnmarshaller();
+        } catch (Exception e) {
+          throw new RuntimeException("Error when creating unmarshaller from JAXB Context.",
e);
+        }
+      }
+    };
   }
 
   /**
@@ -68,9 +93,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   public void encode(T value, OutputStream outStream, Context context)
       throws CoderException, IOException {
     try {
-      JAXBContext jaxbContext = getContext();
-      // TODO: Consider caching in a ThreadLocal if this impacts performance
-      Marshaller jaxbMarshaller = jaxbContext.createMarshaller();
       if (!context.isWholeStream) {
         try {
           long size = getEncodedElementByteSize(value, Context.OUTER);
@@ -83,7 +105,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
         }
       }
 
-      jaxbMarshaller.marshal(value, new CloseIgnoringOutputStream(outStream));
+      jaxbMarshaller.get().marshal(value, new CloseIgnoringOutputStream(outStream));
     } catch (JAXBException e) {
       throw new CoderException(e);
     }
@@ -92,17 +114,13 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
   @Override
   public T decode(InputStream inStream, Context context) throws CoderException, IOException
{
     try {
-      JAXBContext jaxbContext = getContext();
-      // TODO: Consider caching in a ThreadLocal if this impacts performance
-      Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
-
       InputStream stream = inStream;
       if (!context.isWholeStream) {
         long limit = VarInt.decodeLong(inStream);
         stream = ByteStreams.limit(inStream, limit);
       }
       @SuppressWarnings("unchecked")
-      T obj = (T) jaxbUnmarshaller.unmarshal(new CloseIgnoringInputStream(stream));
+      T obj = (T) jaxbUnmarshaller.get().unmarshal(new CloseIgnoringInputStream(stream));
       return obj;
     } catch (JAXBException e) {
       throw new CoderException(e);


Mime
View raw message