crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-462: MemPipeline should verify that DoFns are Serializable
Date Mon, 11 Aug 2014 22:13:09 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 dd4a75675 -> ee7838408


CRUNCH-462: MemPipeline should verify that DoFns are Serializable


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ee783840
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ee783840
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ee783840

Branch: refs/heads/apache-crunch-0.8
Commit: ee7838408e032904a157d0f34e10a0eb189db14f
Parents: dd4a756
Author: Josh Wills <jwills@apache.org>
Authored: Mon Aug 11 11:41:02 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Aug 11 15:08:00 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/mem/collect/MemCollection.java  | 13 +++++++++++++
 .../java/org/apache/crunch/impl/mem/CountersTest.java  |  3 ++-
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ee783840/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index becee88..eaaab59 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -25,6 +25,8 @@ import javassist.util.proxy.MethodFilter;
 import javassist.util.proxy.MethodHandler;
 import javassist.util.proxy.ProxyFactory;
 
+import org.apache.commons.lang.SerializationException;
+import org.apache.commons.lang.SerializationUtils;
 import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
@@ -101,6 +103,16 @@ public class MemCollection<S> implements PCollection<S> {
     return new MemCollection<S>(output, collections[0].getPType());
   }
 
+  private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T>
doFn) {
+    try {
+      return (DoFn<S, T>) SerializationUtils.deserialize(SerializationUtils.serialize(doFn));
+    } catch (SerializationException e) {
+      throw new IllegalStateException(
+          doFn.getClass().getSimpleName() + " named '" + name + "' cannot be serialized",
+          e);
+    }
+  }
+
   @Override
   public <T> PCollection<T> parallelDo(DoFn<S, T> doFn, PType<T>
type) {
     return parallelDo(null, doFn, type);
@@ -114,6 +126,7 @@ public class MemCollection<S> implements PCollection<S> {
   @Override
   public <T> PCollection<T> parallelDo(String name, DoFn<S, T> doFn, PType<T>
type,
       ParallelDoOptions options) {
+    doFn = verifySerializable(name, doFn);
     InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
     Configuration conf = getPipeline().getConfiguration();
     doFn.configure(conf);

http://git-wip-us.apache.org/repos/asf/crunch/blob/ee783840/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
index 6b3d0fd..78acb2c 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mem/CountersTest.java
@@ -24,9 +24,10 @@ import org.apache.crunch.impl.mem.collect.MemCollection;
 import org.apache.crunch.types.writable.Writables;
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.Arrays;
 
-public class CountersTest {
+public class CountersTest implements Serializable {
 
   @Test
   public void counterTest() throws Exception {


Mime
View raw message