crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-425: Implement the Converter#applyPTypeTransforms() logic for Crunch-on-Spark.
Date Sun, 24 Aug 2014 19:53:05 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 7f54a0e22 -> 2d20e7772


CRUNCH-425: Implement the Converter#applyPTypeTransforms() logic for Crunch-on-Spark.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2d20e777
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2d20e777
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2d20e777

Branch: refs/heads/master
Commit: 2d20e7772beee8f588431c6767b411895be94f6c
Parents: 7f54a0e
Author: Josh Wills <jwills@apache.org>
Authored: Sun Aug 24 10:06:47 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Aug 24 12:39:35 2014 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/SkipPTypesIT.java | 116 +++++++++++++++++++
 .../apache/crunch/impl/spark/SparkRuntime.java  |   6 +-
 .../impl/spark/collect/InputCollection.java     |  11 +-
 .../crunch/impl/spark/collect/InputTable.java   |  18 ++-
 .../impl/spark/fn/InputConverterFunction.java   |   2 +-
 .../crunch/impl/spark/fn/PairMapFunction.java   |   2 +-
 6 files changed, 145 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java b/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java
new file mode 100644
index 0000000..609d975
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SkipPTypesIT.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.seq.SeqFileTableSourceTarget;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SkipPTypesIT {
+  @Rule
+  public TemporaryPath tempDir = new TemporaryPath();
+
+  PTableType<Text, LongWritable> ptt = Writables.tableOf(Writables.writables(Text.class),
+      Writables.writables(LongWritable.class));
+
+  @Test
+  public void testSkipPTypes() throws Exception {
+    String out = tempDir.getFileName("out");
+    SparkPipeline pipeline = new SparkPipeline("local", "skipptypes");
+    PCollection<String> shakes = pipeline.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")));
+    PTable<String, Long> wcnt = shakes.count();
+    wcnt.write(new MySeqFileTableSourceTarget(out, ptt));
+    pipeline.run();
+
+    PTable<Text, LongWritable> wcntIn = pipeline.read(new MySeqFileTableSourceTarget(out,
ptt));
+    assertEquals(new LongWritable(1L), wcntIn.materialize().iterator().next().second());
+    pipeline.done();
+  }
+
+  static class ToWritables extends MapFn<Pair<String, Long>, Pair<Text, LongWritable>>
{
+    @Override
+    public Pair<Text, LongWritable> map(Pair<String, Long> input) {
+      return Pair.of(new Text(input.first()), new LongWritable(input.second()));
+    }
+  }
+  static class MySeqFileTableSourceTarget extends SeqFileTableSourceTarget {
+
+    public MySeqFileTableSourceTarget(String path, PTableType ptype) {
+      super(path, ptype);
+    }
+
+    @Override
+    public Converter getConverter() {
+      return new SkipPTypesConverter(getType().getConverter());
+    }
+  }
+
+  static class SkipPTypesConverter implements Converter {
+
+    private Converter delegate;
+
+    public SkipPTypesConverter(Converter delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public Object convertInput(Object key, Object value) {
+      return delegate.convertInput(key, value);
+    }
+
+    @Override
+    public Object convertIterableInput(Object key, Iterable value) {
+      return delegate.convertIterableInput(key, value);
+    }
+
+    @Override
+    public Object outputKey(Object value) {
+      return delegate.outputKey(value);
+    }
+
+    @Override
+    public Object outputValue(Object value) {
+      return delegate.outputValue(value);
+    }
+
+    @Override
+    public Class getKeyClass() {
+      return delegate.getKeyClass();
+    }
+
+    @Override
+    public Class getValueClass() {
+      return delegate.getValueClass();
+    }
+
+    @Override
+    public boolean applyPTypeTransforms() {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index a9537e5..b5bbc8d 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -33,6 +33,7 @@ import org.apache.crunch.PipelineExecution;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.spark.fn.MapFunction;
 import org.apache.crunch.impl.spark.fn.OutputConverterFunction;
@@ -299,14 +300,15 @@ public class SparkRuntime extends AbstractFuture<PipelineResult>
implements Pipe
           getRuntimeContext().setConf(sparkContext.broadcast(WritableUtils.toByteArray(conf)));
           if (t instanceof MapReduceTarget) { //TODO: check this earlier
             Converter c = t.getConverter(ptype);
+            IdentityFn ident = IdentityFn.getInstance();
             JavaPairRDD<?, ?> outRDD;
             if (rdd instanceof JavaRDD) {
               outRDD = ((JavaRDD) rdd)
-                  .map(new MapFunction(ptype.getOutputMapFn(), ctxt))
+                  .map(new MapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn()
: ident, ctxt))
                   .map(new OutputConverterFunction(c));
             } else {
               outRDD = ((JavaPairRDD) rdd)
-                  .map(new PairMapFunction(ptype.getOutputMapFn(), ctxt))
+                  .map(new PairMapFunction(c.applyPTypeTransforms() ? ptype.getOutputMapFn()
: ident, ctxt))
                   .map(new OutputConverterFunction(c));
             }
             try {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
index 237c8de..8273236 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputCollection.java
@@ -17,8 +17,10 @@
  */
 package org.apache.crunch.impl.spark.collect;
 
+import org.apache.crunch.MapFn;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Source;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.BaseInputCollection;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
@@ -26,6 +28,7 @@ import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
 import org.apache.crunch.impl.spark.fn.InputConverterFunction;
 import org.apache.crunch.impl.spark.fn.MapFunction;
+import org.apache.crunch.types.Converter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -44,15 +47,17 @@ public class InputCollection<S> extends BaseInputCollection<S>
implements SparkC
       Job job = new Job(runtime.getConfiguration());
       FileInputFormat.addInputPaths(job, "/tmp"); //placeholder
       source.configureSource(job, -1);
+      Converter converter = source.getConverter();
       JavaPairRDD<?, ?> input = runtime.getSparkContext().newAPIHadoopRDD(
           job.getConfiguration(),
           CrunchInputFormat.class,
-          source.getConverter().getKeyClass(),
-          source.getConverter().getValueClass());
+          converter.getKeyClass(),
+          converter.getValueClass());
       input.rdd().setName(source.toString());
+      MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() :
IdentityFn.getInstance();
       return input
           .map(new InputConverterFunction(source.getConverter()))
-          .map(new MapFunction(source.getType().getInputMapFn(), runtime.getRuntimeContext()));
+          .map(new MapFunction(mapFn, runtime.getRuntimeContext()));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
index 606ffce..e83d912 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/InputTable.java
@@ -17,14 +17,20 @@
  */
 package org.apache.crunch.impl.spark.collect;
 
+import org.apache.crunch.MapFn;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.TableSource;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.impl.dist.collect.BaseInputTable;
 import org.apache.crunch.impl.mr.run.CrunchInputFormat;
 import org.apache.crunch.impl.spark.SparkCollection;
 import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.impl.spark.fn.InputConverterFunction;
+import org.apache.crunch.impl.spark.fn.PairMapFunction;
+import org.apache.crunch.types.Converter;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDDLike;
 
 import java.io.IOException;
@@ -40,11 +46,17 @@ public class InputTable<K, V> extends BaseInputTable<K, V>
implements SparkColle
     try {
       Job job = new Job(runtime.getConfiguration());
       source.configureSource(job, -1); // TODO: a custom input format for crunch-spark
-      return runtime.getSparkContext().newAPIHadoopRDD(
+      Converter converter = source.getConverter();
+      JavaPairRDD<?, ?> input = runtime.getSparkContext().newAPIHadoopRDD(
           job.getConfiguration(),
           CrunchInputFormat.class,
-          source.getConverter().getKeyClass(),
-          source.getConverter().getValueClass());
+          converter.getKeyClass(),
+          converter.getValueClass());
+      input.rdd().setName(source.toString());
+      MapFn mapFn = converter.applyPTypeTransforms() ? source.getType().getInputMapFn() :
IdentityFn.getInstance();
+      return input
+          .map(new InputConverterFunction(source.getConverter()))
+          .map(new PairMapFunction(mapFn, runtime.getRuntimeContext()));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
index 52869a4..36745c1 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/InputConverterFunction.java
@@ -30,6 +30,6 @@ public class InputConverterFunction<K, V, S> extends Function<Tuple2<K,
V>, S> {
 
   @Override
   public S call(Tuple2<K, V> kv) throws Exception {
-    return converter.convertInput(kv._1, kv._2);
+    return converter.convertInput(kv._1(), kv._2());
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2d20e777/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
index 6db30f0..673bbab 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
@@ -39,6 +39,6 @@ public class PairMapFunction<K, V, S> extends Function<Tuple2<K,
V>, S> {
       ctxt.initialize(fn);
       initialized = true;
     }
-    return fn.map(Pair.of(kv._1, kv._2));
+    return fn.map(Pair.of(kv._1(), kv._2()));
   }
 }


Mime
View raw message