crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-469: Fix CCE in crunch-spark InputTables
Date Mon, 22 Sep 2014 16:56:55 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 0a6c8decb -> f12eab83e


CRUNCH-469: Fix CCE in crunch-spark InputTables


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f12eab83
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f12eab83
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f12eab83

Branch: refs/heads/master
Commit: f12eab83e18caed24a168c5753902c50e2b4c1a5
Parents: 0a6c8de
Author: Josh Wills <jwills@apache.org>
Authored: Thu Sep 4 13:17:26 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Sep 4 13:17:26 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/spark/GuavaUtils.java    |  2 +-
 .../crunch/impl/spark/collect/InputTable.java   |  4 +-
 .../crunch/impl/spark/fn/Tuple2MapFunction.java | 45 ++++++++++++++++++++
 3 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/f12eab83/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
index 400ae7b..cb67473 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/GuavaUtils.java
@@ -28,7 +28,7 @@ public class GuavaUtils {
     return new Function<Tuple2<K, V>, Pair<K, V>>() {
       @Override
       public Pair<K, V> apply(@Nullable Tuple2<K, V> kv) {
-        return kv == null ? null : Pair.of(kv._1, kv._2);
+        return kv == null ? null : Pair.of(kv._1(), kv._2());
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f12eab83/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
index e83d912..a0f7189 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
@@ -27,7 +27,7 @@ import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
 import org.apache.crunch.impl.spark.fn.InputConverterFunction;
-import org.apache.crunch.impl.spark.fn.PairMapFunction;
+import org.apache.crunch.impl.spark.fn.Tuple2MapFunction;
 import org.apache.crunch.types.Converter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -56,7 +56,7 @@ public class InputTable<K, V> extends BaseInputTable<K, V> implements
SparkColle
       MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() :
IdentityFn.getInstance();
       return input
           .map(new InputConverterFunction(source.getConverter()))
-          .map(new PairMapFunction(mapFn, runtime.getRuntimeContext()));
+          .mapToPair(new Tuple2MapFunction(mapFn, runtime.getRuntimeContext()));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/f12eab83/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java
new file mode 100644
index 0000000..4ed553d
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/Tuple2MapFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+public class Tuple2MapFunction<K, V> implements PairFunction<Pair<K, V>, K,
V> {
+  private final MapFn<Pair<K, V>, Pair<K, V>> fn;
+  private final SparkRuntimeContext ctxt;
+  private boolean initialized;
+
+  public Tuple2MapFunction(MapFn<Pair<K, V>, Pair<K, V>> fn, SparkRuntimeContext
ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Tuple2<K, V> call(Pair<K, V> p) throws Exception {
+    if (!initialized) {
+      ctxt.initialize(fn, null);
+      initialized = true;
+    }
+    Pair<K, V> res = fn.map(p);
+    return new Tuple2<K, V>(res.first(), res.second());
+  }
+}


Mime
View raw message