crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject crunch git commit: CRUNCH-571: Scrunch functions fail serialization check in the REPL
Date Mon, 19 Oct 2015 09:36:33 GMT
Repository: crunch
Updated Branches:
  refs/heads/master b4da23b26 -> 387068d4e


CRUNCH-571: Scrunch functions fail serialization check in the REPL


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/387068d4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/387068d4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/387068d4

Branch: refs/heads/master
Commit: 387068d4e68b4b22116af8392418353effd1bb5b
Parents: b4da23b
Author: Tom White <tomwhite@apache.org>
Authored: Mon Oct 19 10:21:36 2015 +0100
Committer: Tom White <tomwhite@apache.org>
Committed: Mon Oct 19 10:21:36 2015 +0100

----------------------------------------------------------------------
 .../crunch/impl/mem/collect/MemCollection.java  | 54 +++++++++++++++++++-
 1 file changed, 53 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/387068d4/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 05bff3f..e5f04d5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -17,6 +17,11 @@
  */
 package org.apache.crunch.impl.mem.collect;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Set;
@@ -103,7 +108,7 @@ public class MemCollection<S> implements PCollection<S> {
 
   private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T>
doFn) {
     try {
-      return (DoFn<S, T>) SerializationUtils.deserialize(SerializationUtils.serialize(doFn));
+      return (DoFn<S, T>) deserialize(SerializationUtils.serialize(doFn));
     } catch (SerializationException e) {
       throw new IllegalStateException(
           doFn.getClass().getSimpleName() + " named '" + name + "' cannot be serialized",
@@ -111,6 +116,53 @@ public class MemCollection<S> implements PCollection<S> {
     }
   }
 
+  // Use a custom deserialize implementation (not SerializationUtils) so we can fall back
+  // to using the thread context classloader, which is needed when running Scrunch in
+  // the Scala REPL
+  private static Object deserialize(InputStream inputStream) {
+    if (inputStream == null) {
+      throw new IllegalArgumentException("The InputStream must not be null");
+    }
+    ObjectInputStream in = null;
+    try {
+      // stream closed in the finally
+      in = new ObjectInputStream(inputStream) {
+        @Override
+        protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
+            ClassNotFoundException {
+          try {
+            return super.resolveClass(desc);
+          } catch (ClassNotFoundException e) {
+            ClassLoader cl = Thread.currentThread().getContextClassLoader();
+            return Class.forName(desc.getName(), false, cl);
+          }
+        }
+      };
+      return in.readObject();
+
+    } catch (ClassNotFoundException ex) {
+      throw new SerializationException(ex);
+    } catch (IOException ex) {
+      throw new SerializationException(ex);
+    } finally {
+      try {
+        if (in != null) {
+          in.close();
+        }
+      } catch (IOException ex) {
+        // ignore close exception
+      }
+    }
+  }
+
+  private static Object deserialize(byte[] objectData) {
+    if (objectData == null) {
+      throw new IllegalArgumentException("The byte[] must not be null");
+    }
+    ByteArrayInputStream bais = new ByteArrayInputStream(objectData);
+    return deserialize(bais);
+  }
+
   @Override
   public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T>
type) {
     return parallelDo(null, doFn, type);


Mime
View raw message