crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/9] Crunch on Spark
Date Wed, 11 Dec 2013 20:47:47 GMT
Updated Branches:
  refs/heads/master 11e9b53e2 -> 6e6234138


http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
new file mode 100644
index 0000000..b1184d8
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/OutputConverterFunction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.types.Converter;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+public class OutputConverterFunction<K, V, S> extends PairFunction<S, K, V> {
+  private Converter<K, V, S, ?> converter;
+
+  public OutputConverterFunction(Converter<K, V, S, ?> converter) {
+    this.converter = converter;
+  }
+
+  @Override
+  public Tuple2<K, V> call(S s) throws Exception {
+    return new Tuple2<K, V>(converter.outputKey(s), converter.outputValue(s));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
new file mode 100644
index 0000000..b2d93a0
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapDoFn.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.GuavaUtils;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+import java.util.Iterator;
+
+public class PairFlatMapDoFn<T, K, V> extends PairFlatMapFunction<Iterator<T>,
K, V> {
+  private final DoFn<T, Pair<K, V>> fn;
+  private final SparkRuntimeContext ctxt;
+
+  public PairFlatMapDoFn(DoFn<T, Pair<K, V>> fn, SparkRuntimeContext ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterable<Tuple2<K, V>> call(Iterator<T> input) throws Exception
{
+    ctxt.initialize(fn);
+    return Iterables.transform(
+        new CrunchIterable<T, Pair<K, V>>(fn, input),
+        GuavaUtils.<K, V>pair2tupleFunc());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
new file mode 100644
index 0000000..bc3e701
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairFlatMapPairDoFn.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.GuavaUtils;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+import java.util.Iterator;
+
+public class PairFlatMapPairDoFn<K, V, K2, V2> extends PairFlatMapFunction<Iterator<Tuple2<K,
V>>, K2, V2> {
+  private final DoFn<Pair<K, V>, Pair<K2, V2>> fn;
+  private final SparkRuntimeContext ctxt;
+
+  public PairFlatMapPairDoFn(DoFn<Pair<K, V>, Pair<K2, V2>> fn, SparkRuntimeContext
ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterable<Tuple2<K2, V2>> call(Iterator<Tuple2<K, V>> input)
throws Exception {
+    ctxt.initialize(fn);
+    return Iterables.transform(
+        new CrunchIterable<Pair<K, V>, Pair<K2, V2>>(
+            fn,
+            Iterators.transform(input, GuavaUtils.<K, V>tuple2PairFunc())),
+        GuavaUtils.<K2, V2>pair2tupleFunc());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
new file mode 100644
index 0000000..6db30f0
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapFunction.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.Function;
+import scala.Tuple2;
+
+public class PairMapFunction<K, V, S> extends Function<Tuple2<K, V>, S>
{
+  private final MapFn<Pair<K, V>, S> fn;
+  private final SparkRuntimeContext ctxt;
+  private boolean initialized;
+
+  public PairMapFunction(MapFn<Pair<K, V>, S> fn, SparkRuntimeContext ctxt) {
+    this.fn = fn;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public S call(Tuple2<K, V> kv) throws Exception {
+    if (!initialized) {
+      ctxt.initialize(fn);
+      initialized = true;
+    }
+    return fn.map(Pair.of(kv._1, kv._2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
new file mode 100644
index 0000000..7bfe378
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PairMapIterableFunction.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class PairMapIterableFunction<K, V, S, T> extends PairFunction<Pair<K,
List<V>>, S, Iterable<T>> {
+
+  private final MapFn<Pair<K, List<V>>, Pair<S, Iterable<T>>>
fn;
+  private final SparkRuntimeContext runtimeContext;
+  private boolean initialized;
+
+  public PairMapIterableFunction(
+      MapFn<Pair<K, List<V>>, Pair<S, Iterable<T>>> fn,
+      SparkRuntimeContext runtimeContext) {
+    this.fn = fn;
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Override
+  public Tuple2<S, Iterable<T>> call(Pair<K, List<V>> input) throws
Exception {
+    if (!initialized) {
+      runtimeContext.initialize(fn);
+      initialized = true;
+    }
+    Pair<S, Iterable<T>> out = fn.map(input);
+    return new Tuple2<S, Iterable<T>>(out.first(), out.second());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
new file mode 100644
index 0000000..a10b7f6
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.IntByteArray;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.crunch.impl.spark.serde.SerDe;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.io.IOException;
+
+public class PartitionedMapOutputFunction<K, V> extends PairFunction<Pair<K,
V>, IntByteArray, byte[]> {
+
+  private final SerDe<K> keySerde;
+  private final SerDe<V> valueSerde;
+  private final PGroupedTableType<K, V> ptype;
+  private final Class<? extends Partitioner> partitionerClass;
+  private final int numPartitions;
+  private final SparkRuntimeContext runtimeContext;
+  private transient Partitioner partitioner;
+
+  public PartitionedMapOutputFunction(
+    SerDe<K> keySerde,
+    SerDe<V> valueSerde,
+    PGroupedTableType<K, V> ptype,
+    Class<? extends Partitioner> partitionerClass,
+    int numPartitions,
+    SparkRuntimeContext runtimeContext) {
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+    this.ptype = ptype;
+    this.partitionerClass = partitionerClass;
+    this.numPartitions = numPartitions;
+    this.runtimeContext = runtimeContext;
+  }
+
+  @Override
+  public Tuple2<IntByteArray, byte[]> call(Pair<K, V> p) throws Exception {
+    int partition = getPartitioner().getPartition(p.first(), p.second(), numPartitions);
+    return new Tuple2<IntByteArray, byte[]>(
+        new IntByteArray(partition, keySerde.toBytes(p.first())),
+        valueSerde.toBytes(p.second()));
+  }
+
+  private Partitioner getPartitioner() {
+    if (partitioner == null) {
+      try {
+        ptype.initialize(runtimeContext.getConfiguration());
+        Job job = new Job(runtimeContext.getConfiguration());
+        ptype.configureShuffle(job, GroupingOptions.builder().partitionerClass(partitionerClass).build());
+        partitioner = ReflectionUtils.newInstance(partitionerClass, job.getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error configuring partitioner", e);
+      }
+    }
+    return partitioner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
new file mode 100644
index 0000000..35dd7dd
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceGroupingFunction.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.SparkRuntimeContext;
+import org.apache.crunch.types.PGroupedTableType;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class ReduceGroupingFunction extends PairFlatMapFunction<Iterator<Tuple2<ByteArray,
List<byte[]>>>,
+    ByteArray, List<byte[]>> {
+
+  private final GroupingOptions options;
+  private final PGroupedTableType ptype;
+  private final SparkRuntimeContext ctxt;
+  private transient RawComparator<?> cmp;
+
+  public ReduceGroupingFunction(GroupingOptions options,
+                                PGroupedTableType ptype,
+                                SparkRuntimeContext ctxt) {
+    this.options = options;
+    this.ptype = ptype;
+    this.ctxt = ctxt;
+  }
+
+  @Override
+  public Iterable<Tuple2<ByteArray, List<byte[]>>> call(
+      final Iterator<Tuple2<ByteArray, List<byte[]>>> iter) throws Exception
{
+    return new Iterable<Tuple2<ByteArray, List<byte[]>>>() {
+      @Override
+      public Iterator<Tuple2<ByteArray, List<byte[]>>> iterator() {
+        return new GroupingIterator(iter, rawComparator());
+      }
+    };
+  }
+
+  private RawComparator<?> rawComparator() {
+    if (cmp == null) {
+      try {
+        Job job = new Job(ctxt.getConfiguration());
+        ptype.configureShuffle(job, options);
+        cmp = ReflectionUtils.newInstance(options.getGroupingComparatorClass(), job.getConfiguration());
+      } catch (IOException e) {
+        throw new CrunchRuntimeException("Error configuring grouping comparator", e);
+      }
+    }
+    return cmp;
+  }
+
+  private static class GroupingIterator implements Iterator<Tuple2<ByteArray, List<byte[]>>>
{
+
+    private final Iterator<Tuple2<ByteArray, List<byte[]>>> iter;
+    private final RawComparator cmp;
+    private ByteArray key;
+    private List<byte[]> bytes = Lists.newArrayList();
+
+    public GroupingIterator(Iterator<Tuple2<ByteArray, List<byte[]>>> iter,
RawComparator cmp) {
+      this.iter = iter;
+      this.cmp = cmp;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iter.hasNext() || key != null;
+    }
+
+    @Override
+    public Tuple2<ByteArray, List<byte[]>> next() {
+      ByteArray nextKey = null;
+      List<byte[]> next = null;
+      while (iter.hasNext()) {
+        Tuple2<ByteArray, List<byte[]>> t = iter.next();
+        if (key == null) {
+          key = t._1;
+          bytes.addAll(t._2);
+        } else if (cmp.compare(key.value, 0, key.value.length, t._1.value, 0, t._1.value.length)
== 0) {
+          bytes.addAll(t._2);
+        } else {
+          nextKey = t._1;
+          next = Lists.newArrayList(t._2);
+          break;
+        }
+      }
+      Tuple2<ByteArray, List<byte[]>> ret = new Tuple2<ByteArray, List<byte[]>>(key,
bytes);
+      key = nextKey;
+      bytes = next;
+      return ret;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
new file mode 100644
index 0000000..4ebdfaa
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/fn/ReduceInputFunction.java
@@ -0,0 +1,44 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.crunch.impl.spark.fn;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.spark.ByteArray;
+import org.apache.crunch.impl.spark.serde.SerDe;
+import org.apache.spark.api.java.function.Function;
+import scala.Tuple2;
+
+import java.util.List;
+
+public class ReduceInputFunction<K, V> extends Function<Tuple2<ByteArray, List<byte[]>>,
Pair<K, List<V>>> {
+  private final SerDe<K> keySerDe;
+  private final SerDe<V> valueSerDe;
+
+  public ReduceInputFunction(SerDe<K> keySerDe, SerDe<V> valueSerDe) {
+    this.keySerDe = keySerDe;
+    this.valueSerDe = valueSerDe;
+  }
+
+  @Override
+  public Pair<K, List<V>> call(Tuple2<ByteArray, List<byte[]>> kv)
throws Exception {
+    return Pair.of(keySerDe.fromBytes(kv._1.value), Lists.transform(kv._2, valueSerDe.fromBytesFunction()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
new file mode 100644
index 0000000..e6e08a0
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/AvroSerDe.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.serde;
+
+import com.google.common.base.Function;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class AvroSerDe<T> implements SerDe<T> {
+
+  private AvroType<T> avroType;
+  private transient DatumWriter<T> writer;
+  private transient DatumReader<T> reader;
+
+  public AvroSerDe(AvroType<T> avroType) {
+    this.avroType = avroType;
+    if (avroType.hasReflect() && avroType.hasSpecific()) {
+      Avros.checkCombiningSpecificAndReflectionSchemas();
+    }
+  }
+
+  private DatumWriter<T> getWriter() {
+    if (writer == null) {
+      if (avroType.hasReflect()) {
+        writer = new ReflectDatumWriter<T>(avroType.getSchema());
+      } else if (avroType.hasSpecific()) {
+        writer = new SpecificDatumWriter<T>(avroType.getSchema());
+      } else {
+        writer = new GenericDatumWriter<T>(avroType.getSchema());
+      }
+    }
+    return writer;
+  }
+
+  private DatumReader<T> getReader() {
+    if (reader == null) {
+      if (avroType.hasReflect()) {
+        reader = new ReflectDatumReader<T>(avroType.getSchema());
+      } else if (avroType.hasSpecific()) {
+        reader = new SpecificDatumReader<T>(avroType.getSchema());
+      } else {
+        reader = new GenericDatumReader<T>(avroType.getSchema());
+      }
+    }
+    return reader;
+  }
+
+  @Override
+  public byte[] toBytes(T obj) throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    getWriter().write(obj, encoder);
+    encoder.flush();
+    out.close();
+    return out.toByteArray();
+  }
+
+  @Override
+  public T fromBytes(byte[] bytes) {
+    Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
+    try {
+      return getReader().read(null, decoder);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Function<byte[], T> fromBytesFunction() {
+    return new Function<byte[], T>() {
+      @Override
+      public T apply(@Nullable byte[] input) {
+        return fromBytes(input);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
new file mode 100644
index 0000000..d374a41
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/SerDe.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.serde;
+
+import com.google.common.base.Function;
+
+import java.io.Serializable;
+
+public interface SerDe<T> extends Serializable {
+  byte[] toBytes(T obj) throws Exception;
+
+  T fromBytes(byte[] bytes);
+
+  Function<byte[], T> fromBytesFunction();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
new file mode 100644
index 0000000..d90007d
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/serde/WritableSerDe.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.serde;
+
+import com.google.common.base.Function;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class WritableSerDe implements SerDe<Writable> {
+
+  Class<? extends Writable> clazz;
+
+  public WritableSerDe(Class<? extends Writable> clazz) {
+    this.clazz = clazz;
+  }
+
+  @Override
+  public byte[] toBytes(Writable obj) throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    obj.write(dos);
+    dos.close();
+    return baos.toByteArray();
+  }
+
+  @Override
+  public Writable fromBytes(byte[] bytes) {
+    Writable inst = ReflectionUtils.newInstance(clazz, null);
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+    DataInputStream dis = new DataInputStream(bais);
+    try {
+      inst.readFields(dis);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return inst;
+  }
+
+  @Override
+  public Function<byte[], Writable> fromBytesFunction() {
+    return new Function<byte[], Writable>() {
+      @Override
+      public Writable apply(@Nullable byte[] input) {
+        return fromBytes(input);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/6e623413/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7e288d3..4bacc9b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,6 +51,7 @@ under the License.
     <module>crunch-examples</module>
     <module>crunch-archetype</module>
     <module>crunch-scrunch</module>
+    <module>crunch-spark</module>
     <module>crunch-dist</module>
   </modules>
 
@@ -78,13 +79,14 @@ under the License.
     <jackson.version>1.8.8</jackson.version>
     <protobuf-java.version>2.4.0a</protobuf-java.version>
     <libthrift.version>0.8.0</libthrift.version>
-    <slf4j.version>1.4.3</slf4j.version>
+    <slf4j.version>1.6.1</slf4j.version>
     <log4j.version>1.2.15</log4j.version>
     <junit.version>4.10</junit.version>
     <hamcrest.version>1.1</hamcrest.version>
     <mockito.version>1.9.0</mockito.version>
-    <scala.version>2.9.2</scala.version>
-    <scalatest.version>1.7.2</scalatest.version>
+    <scala.version>2.9.3</scala.version>
+    <scalatest.version>1.9.1</scalatest.version>
+    <spark.version>0.8.0-incubating</spark.version>
     <pkg>org.apache.crunch</pkg>
   </properties>
 
@@ -137,6 +139,12 @@ under the License.
         <artifactId>crunch-scrunch</artifactId>
         <version>${project.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-spark</artifactId>
+        <version>${project.version}</version>
+      </dependency>
       
       <dependency>
         <groupId>org.apache.crunch</groupId>
@@ -336,6 +344,12 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-core_${scala.version}</artifactId>
+        <version>${spark.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-library</artifactId>
         <version>${scala.version}</version>
@@ -647,7 +661,7 @@ under the License.
         <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
-          <version>3.1.5</version>
+          <version>3.1.6</version>
         </plugin>
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>


Mime
View raw message