crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-538: Java lambdas for Crunch business logic.
Date Mon, 31 Aug 2015 17:12:10 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 366da2ee0 -> 60b28b12b


CRUNCH-538: Java lambdas for Crunch business logic.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/60b28b12
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/60b28b12
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/60b28b12

Branch: refs/heads/master
Commit: 60b28b12b26348a2e17ae5f8023e8f4256cd03fc
Parents: 366da2e
Author: Josh Wills <jwills@apache.org>
Authored: Wed Jul 1 15:20:50 2015 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Aug 27 15:05:42 2015 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/MultiStagePlanningIT.java |   2 +-
 .../it/java/org/apache/crunch/PageRankIT.java   |   2 +-
 .../it/java/org/apache/crunch/WordCountIT.java  |  13 +-
 .../src/main/java/org/apache/crunch/IDoFn.java  |  49 ++++++
 .../main/java/org/apache/crunch/IFilterFn.java  |  27 ++++
 .../main/java/org/apache/crunch/IFlatMapFn.java |  28 ++++
 .../src/main/java/org/apache/crunch/IMapFn.java |  27 ++++
 .../java/org/apache/crunch/PCollection.java     |  96 ++++++++++++
 .../src/main/java/org/apache/crunch/PTable.java |  32 +++-
 .../java/org/apache/crunch/fn/IFnHelpers.java   | 149 +++++++++++++++++++
 .../impl/dist/collect/PCollectionImpl.java      |  89 +++++++++++
 .../crunch/impl/dist/collect/PTableBase.java    |  29 +++-
 .../crunch/impl/mem/collect/MemCollection.java  |  89 ++++++++++-
 .../impl/mem/collect/MemGroupedTable.java       |  11 --
 .../crunch/impl/mem/collect/MemTable.java       |  30 +++-
 15 files changed, 641 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
index a7b7d48..38211a7 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
@@ -60,7 +60,7 @@ public class MultiStagePlanningIT implements Serializable {
 
         PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile)
                 .parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(),
strings()))
-                .filter(new FilterFn<Pair<String, String>>() {
+                .filter(new IFilterFn<Pair<String, String>>() {
                     @Override
                     public boolean accept(Pair<String, String> input) {
                         // This is odd but it is the simpler way of simulating this would
take longer than

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index 701f78a..b30465d 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -130,7 +130,7 @@ public class PageRankIT {
     }, ptf.tableOf(ptf.strings(), ptf.floats()));
 
     return input.cogroup(outbound).mapValues(
-        new MapFn<Pair<Collection<PageRankData>, Collection<Float>>,
PageRankData>() {
+        new IMapFn<Pair<Collection<PageRankData>, Collection<Float>>,
PageRankData>() {
           @Override
           public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>>
input) {
             PageRankData prd = Iterables.getOnlyElement(input.first());

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
index cee6a90..c4e1d58 100644
--- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.crunch.fn.Aggregators;
@@ -51,15 +52,15 @@ public class WordCountIT {
   }
 
   public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily
typeFamily) {
-    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
-
+    return Aggregate.count(words.parallelDo(new IDoFn<String, String>() {
       @Override
-      public void process(String line, Emitter<String> emitter) {
-        for (String word : line.split("\\s+")) {
-          emitter.emit(word);
+      public void process(Context<String, String> context) {
+        List<String> words = Arrays.asList(context.element().split("\\s+"));
+        for (String word : words) {
           if ("and".equals(word)) {
-            increment(WordCountStats.ANDS);
+            context.increment(WordCountStats.ANDS);
           }
+          context.emit(word);
         }
       }
     }, typeFamily.strings()));

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
new file mode 100644
index 0000000..b393f43
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly version of the {@link DoFn} class.
+ */
+public interface IDoFn<S, T> extends Serializable {
+
+  void process(Context<S, T> context);
+
+  public interface Context<S, T> {
+    S element();
+
+    void emit(T t);
+
+    TaskInputOutputContext getContext();
+
+    Configuration getConfiguration();
+
+    void increment(String groupName, String counterName);
+
+    void increment(String groupName, String counterName, long value);
+
+    void increment(Enum<?> counterName);
+
+    void increment(Enum<?> counterName, long value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
new file mode 100644
index 0000000..bb8a03d
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly version of the {@link org.apache.crunch.FilterFn} class.
+ */
+public interface IFilterFn<S> extends Serializable {
+  boolean accept(S input);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
new file mode 100644
index 0000000..a2b85c4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly interface for writing business logic against {@code PCollection}s
+ * that take in a single input record and return 0 to N output records via an {@code Iterable}.
+ */
+public interface IFlatMapFn<S, T> extends Serializable {
+  Iterable<T> process(S input);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
new file mode 100644
index 0000000..3c06d9e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly version of the {@link org.apache.crunch.MapFn} class.
+ */
+public interface IMapFn<S, T> extends Serializable {
+  public T map(S input);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index a1d507a..5d072e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -133,6 +133,96 @@ public interface PCollection<S> {
       ParallelDoOptions options);
 
   /**
+   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
+   */
+  <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type);
+
+  /**
+   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K,
V> type);
+
+  /**
+   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
+   */
+  <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T>
type);
+
+  /**
+   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>>
fn, PTableType<K, V> type);
+
+  /**
+   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
+   */
+  <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T>
type, ParallelDoOptions options);
+
+  /**
+   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>>
fn, PTableType<K, V> type, ParallelDoOptions options);
+
+  /**
+   * For each element of this {@code PCollection}, generate 0 to N output values using the
+   * given {@code IFlatMapFn}. Designed for Java lambdas.
+   */
+  <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type);
+
+  /**
+   * For each element of this {@code PCollection}, generate 0 to N output values using the
+   * given {@code IFlatMapFn}. Designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K,
V> type);
+
+  /**
+   * For each element of this {@code PCollection}, generate 0 to N output values using the
+   * given {@code IFlatMapFn}. Designed for Java lambdas.
+   */
+  <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T>
type);
+
+  /**
+   * For each element of this {@code PCollection}, generate 0 to N output values using the
+   * given {@code IFlatMapFn}. Designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>>
fn, PTableType<K, V> type);
+
+  /**
+   * For each element of this {@code PCollection}, generate one output value using the
+   * given {@code IMapFn}. Designed for Java lambdas.
+   */
+  <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type);
+
+  /**
+   * For each element of this {@code PCollection}, generate one output value using the
+   * given {@code IMapFn}. Designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K,
V> type);
+
+  /**
+   * For each element of this {@code PCollection}, generate one output value using the
+   * given {@code IMapFn}. Designed for Java lambdas.
+   */
+  <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type);
+
+  /**
+   * For each element of this {@code PCollection}, generate one output value using the
+   * given {@code IMapFn}. Designed for Java lambdas.
+   */
+  <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn,
PTableType<K, V> type);
+
+  /**
+   * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
+   * Designed for Java lambdas.
+   */
+  PCollection<S> filter(IFilterFn<S> fn);
+
+  /**
+   * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
+   * Designed for Java lambdas.
+   */
+  PCollection<S> filter(String name, IFilterFn<S> fn);
+
+  /**
    * Write the contents of this {@code PCollection} to the given {@code Target},
    * using the storage format specified by the target.
    *
@@ -259,6 +349,12 @@ public interface PCollection<S> {
   <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
 
   /**
+   * Apply the given {@code IMapFn} to each element of this instance in order to
+   * create a {@code PTable}. Designed for use with Java 8 lambdas.
+   */
+  <K> PTable<K, S> by(IMapFn<S, K> extractKeyFn, PType<K> keyType);
+
+  /**
    * Apply the given map function to each element of this instance in order to
    * create a {@code PTable}.
    *

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 09fe9db..5609c3f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -107,16 +107,28 @@ public interface PTable<K, V> extends PCollection<Pair<K,
V>> {
 
   /**
    * Returns a {@code PTable} that has the same keys as this instance, but
+   * uses the given function to map the values. Designed for Java lambdas.
+   */
+  <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype);
+
+  /**
+   * Returns a {@code PTable} that has the same keys as this instance, but
    * uses the given function to map the values.
    */
   <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U>
ptype);
- 
+
   /**
    * Returns a {@code PTable} that has the same values as this instance, but
    * uses the given function to map the keys.
    */
   <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2> ptype);
-  
+
+  /**
+   * Returns a {@code PTable} that has the same values as this instance, but
+   * uses the given function to map the keys. Designed for Java lambdas.
+   */
+  <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype);
+
   /**
    * Returns a {@code PTable} that has the same values as this instance, but
    * uses the given function to map the keys.
@@ -134,7 +146,13 @@ public interface PTable<K, V> extends PCollection<Pair<K,
V>> {
    * {@code PTable}.
    */
   PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn);
-  
+
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PTable}. Designed for Java lambdas.
+   */
+  PTable<K, V> filter(IFilterFn<Pair<K, V>> fn);
+
   /**
    * Apply the given filter function to this instance and return the resulting
    * {@code PTable}.
@@ -145,7 +163,13 @@ public interface PTable<K, V> extends PCollection<Pair<K,
V>> {
    *          The {@code FilterFn} to apply
    */
   PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
-  
+
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PTable}. Designed for Java lambdas.
+   */
+  PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> fn);
+
   /**
    * Returns a PTable made up of the pairs in this PTable with the largest value
    * field.

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
new file mode 100644
index 0000000..8560fab
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.IDoFn;
+import org.apache.crunch.IFilterFn;
+import org.apache.crunch.IFlatMapFn;
+import org.apache.crunch.IMapFn;
+import org.apache.crunch.MapFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+public class IFnHelpers {
+
+  public static <S, T> DoFn<S, T> wrap(final org.apache.crunch.IDoFn<S, T>
fn) {
+    return new IDoFnWrapper(fn);
+  }
+
+  public static <S, T> DoFn<S, T> wrapFlatMap(final IFlatMapFn<S, T> fn)
{
+    return new DoFn<S, T>() {
+      @Override
+      public void process(S input, Emitter<T> emitter) {
+        for (T t : fn.process(input)) {
+          emitter.emit(t);
+        }
+      }
+    };
+  }
+
+  public static <S, T> MapFn<S, T> wrapMap(final IMapFn<S, T> fn) {
+    return new MapFn<S, T>() {
+      @Override
+      public T map(S input) {
+        return fn.map(input);
+      }
+    };
+  }
+
+  public static <S> FilterFn<S> wrapFilter(final IFilterFn<S> fn) {
+    return new FilterFn<S>() {
+      @Override
+      public boolean accept(S input) {
+        return fn.accept(input);
+      }
+    };
+  }
+
+  static class IDoFnWrapper<S, T> extends DoFn<S, T> {
+
+    private final org.apache.crunch.IDoFn<S, T> fn;
+    private transient ContextImpl<S, T> ctxt;
+
+    public IDoFnWrapper(org.apache.crunch.IDoFn<S, T> fn) {
+      this.fn = fn;
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      if (getContext() == null) {
+        this.ctxt = new ContextImpl<S, T>(getConfiguration());
+      } else {
+        this.ctxt = new ContextImpl<S, T>(getContext());
+      }
+    }
+
+    @Override
+    public void process(S input, Emitter<T> emitter) {
+      fn.process(ctxt.update(input, emitter));
+    }
+  }
+
+  static class ContextImpl<S, T> implements IDoFn.Context<S, T> {
+    private S element;
+    private Emitter<T> emitter;
+    private TaskInputOutputContext context;
+    private Configuration conf;
+
+    public ContextImpl(TaskInputOutputContext context) {
+      this.context = context;
+      this.conf = context.getConfiguration();
+    }
+
+    public ContextImpl(Configuration conf) {
+      this.context = null;
+      this.conf = conf;
+    }
+
+    public ContextImpl update(S element, Emitter<T> emitter) {
+      this.element = element;
+      this.emitter = emitter;
+      return this;
+    }
+
+    public S element() {
+      return element;
+    }
+
+    public void emit(T t) {
+      emitter.emit(t);
+    }
+
+    public TaskInputOutputContext getContext() {
+      return context;
+    }
+
+    public Configuration getConfiguration() {
+      return conf;
+    }
+
+    public void increment(String groupName, String counterName) {
+      increment(groupName, counterName, 1);
+    }
+
+    public void increment(String groupName, String counterName, long value) {
+      if (context != null) {
+        context.getCounter(groupName, counterName).increment(value);
+      }
+    }
+
+    public void increment(Enum<?> counterName) {
+      increment(counterName, 1);
+    }
+
+    public void increment(Enum<?> counterName, long value) {
+      if (context != null) {
+        context.getCounter(counterName).increment(value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index 315baf1..2a5e1f5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -25,6 +25,10 @@ import org.apache.crunch.CachingOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
+import org.apache.crunch.IFilterFn;
+import org.apache.crunch.IFlatMapFn;
+import org.apache.crunch.IDoFn;
+import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -36,6 +40,7 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.io.ReadableSource;
@@ -166,6 +171,85 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
     return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options);
   }
 
+  @Override
+  public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type)
{
+    return parallelDo(IFnHelpers.wrap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn,
PTableType<K, V> type) {
+    return parallelDo(IFnHelpers.wrap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T>
type) {
+    return parallelDo(name, IFnHelpers.wrap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>>
fn, PTableType<K, V> type) {
+    return parallelDo(name, IFnHelpers.wrap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T>
type, ParallelDoOptions options) {
+    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>>
fn, PTableType<K, V> type, ParallelDoOptions options) {
+    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
+  }
+
+  @Override
+  public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T>
type) {
+    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn,
PTableType<K, V> type) {
+    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T>
type) {
+    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K,
V>> fn, PTableType<K, V> type) {
+    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
+    return parallelDo(IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K,
V> type) {
+    return parallelDo(IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T>
type) {
+    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>>
fn, PTableType<K, V> type) {
+    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public PCollection<S> filter(IFilterFn<S> fn) {
+    return filter(IFnHelpers.wrapFilter(fn));
+  }
+
+  @Override
+  public PCollection<S> filter(String name, IFilterFn<S> fn) {
+    return filter(name, IFnHelpers.wrapFilter(fn));
+  }
 
   public PCollection<S> write(Target target) {
     if (materializedAt != null) {
@@ -271,6 +355,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
   }
 
   @Override
+  public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType)
{
+    return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType,
getPType()));
+  }
+
+  @Override
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K>
keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType,
getPType()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
index e81773e..6bc3a41 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -21,6 +21,8 @@ import com.google.common.collect.Lists;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.IFilterFn;
+import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -29,6 +31,7 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
+import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
@@ -130,12 +133,27 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K,
V>> imple
   public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn)
{
     return parallelDo(name, filterFn, getPTableType());
   }
-  
+
+  @Override
+  public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn)
{
+    return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
+  }
+
   @Override
   public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype)
{
     return PTables.mapValues(this, mapFn, ptype);
   }
-  
+
+  @Override
+  public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U>
ptype) {
+    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
+  }
+
   @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U>
ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
@@ -145,7 +163,12 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K,
V>> imple
   public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2>
ptype) {
     return PTables.mapKeys(this, mapFn, ptype);
   }
-  
+
+  @Override
+  public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2>
ptype) {
+    return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
+  }
+
   @Override
   public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2>
ptype) {
     return PTables.mapKeys(name, this, mapFn, ptype);

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 55b7821..05bff3f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -31,6 +31,10 @@ import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
+import org.apache.crunch.IFilterFn;
+import org.apache.crunch.IFlatMapFn;
+import org.apache.crunch.IDoFn;
+import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -42,6 +46,7 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
+import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
 import org.apache.crunch.lib.Aggregate;
@@ -59,7 +64,6 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
 
 public class MemCollection<S> implements PCollection<S> {
 
@@ -159,6 +163,84 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
+  public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type)
{
+    return parallelDo(null, fn, type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn,
PTableType<K, V> type) {
+    return parallelDo(null, fn, type);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T>
type) {
+    return parallelDo(name, fn, type, null);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>>
fn, PTableType<K, V> type) {
+    return parallelDo(name, fn, type, null);
+  }
+
+  @Override
+  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T>
type, ParallelDoOptions options) {
+    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>>
fn, PTableType<K, V> type, ParallelDoOptions options) {
+    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
+  }
+
+  @Override
+  public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T>
type) {
+    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn,
PTableType<K, V> type) {
+    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T>
type) {
+    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K,
V>> fn, PTableType<K, V> type) {
+    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
+    return parallelDo(IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K,
V> type) {
+    return parallelDo(IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T>
type) {
+    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
+  }
+
+  @Override
+  public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>>
fn, PTableType<K, V> type) {
+    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
+  }
+
+  public PCollection<S> filter(IFilterFn<S> fn) {
+    return filter(IFnHelpers.wrapFilter(fn));
+  }
+
+  public PCollection<S> filter(String name, IFilterFn<S> fn) {
+    return filter(name, IFnHelpers.wrapFilter(fn));
+  }
+
+  @Override
   public PCollection<S> write(Target target) {
     getPipeline().write(this, target);
     return this;
@@ -280,6 +362,11 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
+  public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType)
{
+    return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType,
getPType()));
+  }
+
+  @Override
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K>
keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType,
getPType()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 0e4516a..f3db972 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -17,11 +17,6 @@
  */
 package org.apache.crunch.impl.mem.collect;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
@@ -32,7 +27,6 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.lib.PTables;
@@ -40,11 +34,6 @@ import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>>
implements PGroupedTable<K, V> {
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/60b28b12/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 3f3bd77..03b5a70 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -18,13 +18,14 @@
 package org.apache.crunch.impl.mem.collect;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.IFilterFn;
+import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
@@ -32,6 +33,7 @@ import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Target;
+import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
 import org.apache.crunch.lib.Join;
@@ -41,8 +43,6 @@ import org.apache.crunch.materialize.pobject.MapPObject;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 
-import com.google.common.collect.Lists;
-
 public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements
PTable<K, V> {
 
   private PTableType<K, V> ptype;
@@ -138,10 +138,25 @@ public class MemTable<K, V> extends MemCollection<Pair<K,
V>> implements PTable<
   }
 
   @Override
+  public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
+  }
+
+  @Override
+  public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn)
{
+    return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
+  }
+
+  @Override
   public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype)
{
     return PTables.mapValues(this, mapFn, ptype);
   }
-  
+
+  @Override
+  public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U>
ptype) {
+    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
+  }
+
   @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U>
ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
@@ -151,7 +166,12 @@ public class MemTable<K, V> extends MemCollection<Pair<K,
V>> implements PTable<
   public <K2> PTable<K2, V> mapKeys(MapFn<K, K2> mapFn, PType<K2>
ptype) {
     return PTables.mapKeys(this, mapFn, ptype);
   }
-  
+
+  @Override
+  public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2>
ptype) {
+    return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
+  }
+
   @Override
   public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2>
ptype) {
     return PTables.mapKeys(name, this, mapFn, ptype);


Mime
View raw message