crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-388: Fix memory and spark issues discovered testing w/Oryx
Date Mon, 05 May 2014 15:25:16 GMT
Repository: crunch
Updated Branches:
  refs/heads/master c88ce4718 -> 1d4448e64


CRUNCH-388: Fix memory and spark issues discovered testing w/Oryx


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/1d4448e6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/1d4448e6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/1d4448e6

Branch: refs/heads/master
Commit: 1d4448e64e79724f5973022da3da94655a38c04c
Parents: c88ce47
Author: Josh Wills <jwills@apache.org>
Authored: Sun May 4 22:32:11 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun May 4 22:32:11 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/mem/collect/MemCollection.java |  4 +++-
 .../org/apache/crunch/impl/spark/SparkPipeline.java   | 14 +++++++++++---
 .../org/apache/crunch/impl/spark/SparkRuntime.java    |  2 +-
 3 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/1d4448e6/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 8e509bc..240de1c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -297,7 +297,7 @@ public class MemCollection<S> implements PCollection<S> {
     }
 
     final Set<String> handledMethods = ImmutableSet.of("getConfiguration", "getCounter",

-                                                  "progress", "getTaskAttemptID");
+                                                  "progress", "getNumReduceTasks", "getTaskAttemptID");
     factory.setFilter(new MethodFilter() {
       @Override
       public boolean isHandled(Method m) {
@@ -315,6 +315,8 @@ public class MemCollection<S> implements PCollection<S> {
           return null;
         } else if ("getTaskAttemptID".equals(name)) {
           return taskAttemptId;
+        } else if ("getNumReduceTasks".equals(name)) {
+          return 1;
         } else if ("getCounter".equals(name)){ // getCounter
           if (args.length == 1) {
             return MemPipeline.getCounters().findCounter((Enum<?>) args[0]);

http://git-wip-us.apache.org/repos/asf/crunch/blob/1d4448e6/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 05e6e0c..1076c33 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -19,6 +19,8 @@ package org.apache.crunch.impl.spark;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
@@ -41,6 +43,8 @@ import java.util.Map;
 
 public class SparkPipeline extends DistributedPipeline {
 
+  private static final Log LOG = LogFactory.getLog(SparkPipeline.class);
+
   private final String sparkConnect;
   private JavaSparkContext sparkContext;
   private Class<?> jarClass;
@@ -102,8 +106,7 @@ public class SparkPipeline extends DistributedPipeline {
       exec.waitUntilDone();
       return exec.getResult();
     } catch (Exception e) {
-      // TODO: How to handle this without changing signature?
-      // LOG.error("Exception running pipeline", e);
+      LOG.error("Exception running pipeline", e);
       return PipelineResult.EMPTY;
     }
   }
@@ -120,7 +123,12 @@ public class SparkPipeline extends DistributedPipeline {
     if (sparkContext == null) {
       this.sparkContext = new JavaSparkContext(sparkConnect, getName());
       if (jarClass != null) {
-        sparkContext.addJar(JavaSparkContext.jarOfClass(jarClass)[0]);
+        String[] jars = JavaSparkContext.jarOfClass(jarClass);
+        if (jars != null && jars.length > 0) {
+          for (String jar : jars) {
+            sparkContext.addJar(jar);
+          }
+        }
       }
     }
     SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets,
toMaterialize,

http://git-wip-us.apache.org/repos/asf/crunch/blob/1d4448e6/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 2016c50..2cb2fb3 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -264,7 +264,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult>
implements Pipe
                 throw new IllegalArgumentException("Spark execution cannot handle non-MapReduceTarget:
" + t);
               }
             } catch (Exception et) {
-              et.printStackTrace();
+              LOG.error("Spark Exception", et);
               status.set(Status.FAILED);
               set(PipelineResult.EMPTY);
               doneSignal.countDown();


Mime
View raw message