crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject [2/2] crunch git commit: Java 8 lambda support for Apache Crunch.
Date Tue, 12 Jan 2016 23:05:21 GMT
Java 8 lambda support for Apache Crunch.

Remove lambda support from crunch-core, and instead implement a new module called crunch-lambda.
This will allow full use of Java 8 features in implementing support for lambda expressions and
method references, without requiring a dependency on Java 8 for crunch-core. Pthings are wrapped
into analagous Lthings which can be operated on with an API inspired both by the existing Crunch
API and the Java 8 streams API.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/7d7af4ef
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/7d7af4ef
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/7d7af4ef

Branch: refs/heads/master
Commit: 7d7af4ef43b122cc03cee721b9106d174d71d435
Parents: f8920d3
Author: David Whiting <davw@apache.org>
Authored: Sat Jan 2 11:28:34 2016 +0100
Committer: David Whiting <davw@apache.org>
Committed: Sun Jan 10 21:29:49 2016 +0100

----------------------------------------------------------------------
 .../org/apache/crunch/MultiStagePlanningIT.java |   2 +-
 .../it/java/org/apache/crunch/PageRankIT.java   |   2 +-
 .../it/java/org/apache/crunch/WordCountIT.java  |  12 +-
 .../src/main/java/org/apache/crunch/IDoFn.java  |  49 ----
 .../main/java/org/apache/crunch/IFilterFn.java  |  27 --
 .../main/java/org/apache/crunch/IFlatMapFn.java |  28 ---
 .../src/main/java/org/apache/crunch/IMapFn.java |  27 --
 .../java/org/apache/crunch/PCollection.java     |  96 +-------
 .../java/org/apache/crunch/PGroupedTable.java   |  11 -
 .../src/main/java/org/apache/crunch/PTable.java |  24 --
 .../java/org/apache/crunch/fn/IFnHelpers.java   | 149 -----------
 .../impl/dist/collect/BaseGroupedTable.java     |   9 -
 .../impl/dist/collect/PCollectionImpl.java      |  92 +------
 .../crunch/impl/dist/collect/PTableBase.java    |  22 --
 .../crunch/impl/mem/collect/MemCollection.java  |  89 -------
 .../impl/mem/collect/MemGroupedTable.java       |   7 -
 .../crunch/impl/mem/collect/MemTable.java       |  23 --
 crunch-dist/pom.xml                             |   4 +
 crunch-lambda/pom.xml                           |  67 +++++
 .../org/apache/crunch/lambda/LAggregator.java   |  57 +++++
 .../org/apache/crunch/lambda/LCollection.java   | 244 +++++++++++++++++++
 .../crunch/lambda/LCollectionFactory.java       |  44 ++++
 .../crunch/lambda/LCollectionFactoryImpl.java   |  70 ++++++
 .../java/org/apache/crunch/lambda/LDoFn.java    |  31 +++
 .../org/apache/crunch/lambda/LDoFnContext.java  |  52 ++++
 .../org/apache/crunch/lambda/LDoFnWrapper.java  | 106 ++++++++
 .../org/apache/crunch/lambda/LGroupedTable.java | 162 ++++++++++++
 .../java/org/apache/crunch/lambda/LTable.java   | 188 ++++++++++++++
 .../java/org/apache/crunch/lambda/Lambda.java   |  59 +++++
 .../apache/crunch/lambda/fn/SBiConsumer.java    |  28 +++
 .../apache/crunch/lambda/fn/SBiFunction.java    |  28 +++
 .../crunch/lambda/fn/SBinaryOperator.java       |  28 +++
 .../org/apache/crunch/lambda/fn/SConsumer.java  |  28 +++
 .../org/apache/crunch/lambda/fn/SFunction.java  |  28 +++
 .../org/apache/crunch/lambda/fn/SPredicate.java |  28 +++
 .../org/apache/crunch/lambda/fn/SSupplier.java  |  28 +++
 .../apache/crunch/lambda/fn/package-info.java   |  22 ++
 .../org/apache/crunch/lambda/package-info.java  |  30 +++
 .../apache/crunch/lambda/LCollectionTest.java   | 128 ++++++++++
 .../apache/crunch/lambda/LGroupedTableTest.java | 103 ++++++++
 .../org/apache/crunch/lambda/LTableTest.java    |  94 +++++++
 .../org/apache/crunch/lambda/TestCommon.java    |  34 +++
 .../org/apache/crunch/lambda/TypedRecord.java   |  52 ++++
 pom.xml                                         |  17 ++
 44 files changed, 1771 insertions(+), 658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
index 38211a7..a7b7d48 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MultiStagePlanningIT.java
@@ -60,7 +60,7 @@ public class MultiStagePlanningIT implements Serializable {
 
         PTable<String, String> addressesTable = pipeline.readTextFile(addressesFile)
                 .parallelDo("Split addresses", new StringToPairMapFn(), tableOf(strings(), strings()))
-                .filter(new IFilterFn<Pair<String, String>>() {
+                .filter(new FilterFn<Pair<String, String>>() {
                     @Override
                     public boolean accept(Pair<String, String> input) {
                         // This is odd but it is the simpler way of simulating this would take longer than

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
index b30465d..701f78a 100644
--- a/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/PageRankIT.java
@@ -130,7 +130,7 @@ public class PageRankIT {
     }, ptf.tableOf(ptf.strings(), ptf.floats()));
 
     return input.cogroup(outbound).mapValues(
-        new IMapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
+        new MapFn<Pair<Collection<PageRankData>, Collection<Float>>, PageRankData>() {
           @Override
           public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>> input) {
             PageRankData prd = Iterables.getOnlyElement(input.first());

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
index 4c77c41..e0bd719 100644
--- a/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/WordCountIT.java
@@ -39,6 +39,7 @@ import org.apache.crunch.types.PTypes;
 import org.apache.crunch.types.avro.AvroTypeFamily;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -55,17 +56,18 @@ public class WordCountIT {
   }
 
   public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) {
-    return Aggregate.count(words.parallelDo(new IDoFn<String, String>() {
+    return Aggregate.count(words.parallelDo(new DoFn<String, String>() {
       @Override
-      public void process(Context<String, String> context) {
-        List<String> words = Arrays.asList(context.element().split("\\s+"));
+      public void process(String input, Emitter<String> emitter) {
+        List<String> words = Arrays.asList(input.split("\\s+"));
         for (String word : words) {
           if ("and".equals(word)) {
-            context.increment(WordCountStats.ANDS);
+            increment(WordCountStats.ANDS);
           }
-          context.emit(word);
+          emitter.emit(word);
         }
       }
+
     }, typeFamily.strings()));
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java b/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
deleted file mode 100644
index b393f43..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IDoFn.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link DoFn} class.
- */
-public interface IDoFn<S, T> extends Serializable {
-
-  void process(Context<S, T> context);
-
-  public interface Context<S, T> {
-    S element();
-
-    void emit(T t);
-
-    TaskInputOutputContext getContext();
-
-    Configuration getConfiguration();
-
-    void increment(String groupName, String counterName);
-
-    void increment(String groupName, String counterName, long value);
-
-    void increment(Enum<?> counterName);
-
-    void increment(Enum<?> counterName, long value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java b/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
deleted file mode 100644
index bb8a03d..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IFilterFn.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link org.apache.crunch.FilterFn} class.
- */
-public interface IFilterFn<S> extends Serializable {
-  boolean accept(S input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
deleted file mode 100644
index a2b85c4..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IFlatMapFn.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly interface for writing business logic against {@code PCollection}s
- * that take in a single input record and return 0 to N output records via an {@code Iterable}.
- */
-public interface IFlatMapFn<S, T> extends Serializable {
-  Iterable<T> process(S input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java b/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
deleted file mode 100644
index 3c06d9e..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/IMapFn.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import java.io.Serializable;
-
-/**
- * A Java lambdas friendly version of the {@link org.apache.crunch.MapFn} class.
- */
-public interface IMapFn<S, T> extends Serializable {
-  public T map(S input);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index 5d072e6..8043349 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -132,95 +132,6 @@ public interface PCollection<S> {
   <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> doFn, PTableType<K, V> type,
       ParallelDoOptions options);
 
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options);
-
-  /**
-   * Similar to other instances of {@code parallelDo}, but designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate 0 to N output values using the
-   * given {@code IFlatMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type);
-
-  /**
-   * For each element of this {@code PCollection}, generate one output value using the
-   * given {@code IMapFn}. Designed for Java lambdas.
-   */
-  <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type);
-
-  /**
-   * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
-   * Designed for Java lambdas.
-   */
-  PCollection<S> filter(IFilterFn<S> fn);
-
-  /**
-   * Filter elements of this {@code PCollection} using the given {@code IFilterFn}.
-   * Designed for Java lambdas.
-   */
-  PCollection<S> filter(String name, IFilterFn<S> fn);
 
   /**
    * Write the contents of this {@code PCollection} to the given {@code Target},
@@ -349,12 +260,6 @@ public interface PCollection<S> {
   <K> PTable<K, S> by(MapFn<S, K> extractKeyFn, PType<K> keyType);
 
   /**
-   * Apply the given {@code IMapFn} to each element of this instance in order to
-   * create a {@code PTable}. Designed for use with Java 8 lambdas.
-   */
-  <K> PTable<K, S> by(IMapFn<S, K> extractKeyFn, PType<K> keyType);
-
-  /**
    * Apply the given map function to each element of this instance in order to
    * create a {@code PTable}.
    *
@@ -385,4 +290,5 @@ public interface PCollection<S> {
    * Returns a {@code PCollection} that contains the result of aggregating all values in this instance.
    */
   PCollection<S> aggregate(Aggregator<S> aggregator);
+  
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index 6ac86de..756855c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -79,17 +79,6 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
    */
   <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype);
 
-
-  /**
-   * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
-   * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
-   * called once. Designed for Java lambdas
-   * @param mapFn The mapping function (can be lambda/method ref)
-   * @param ptype The serialization infromation for the returned data
-   * @return A new {@code PTable} instance
-   */
-  <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype);
-  
   /**
    * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
    * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PTable.java b/crunch-core/src/main/java/org/apache/crunch/PTable.java
index 5609c3f..74cade8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PTable.java
@@ -107,12 +107,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Returns a {@code PTable} that has the same keys as this instance, but
-   * uses the given function to map the values. Designed for Java lambdas.
-   */
-  <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype);
-
-  /**
-   * Returns a {@code PTable} that has the same keys as this instance, but
    * uses the given function to map the values.
    */
   <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype);
@@ -125,12 +119,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Returns a {@code PTable} that has the same values as this instance, but
-   * uses the given function to map the keys. Designed for Java lambdas.
-   */
-  <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype);
-
-  /**
-   * Returns a {@code PTable} that has the same values as this instance, but
    * uses the given function to map the keys.
    */
   <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype);
@@ -149,12 +137,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
 
   /**
    * Apply the given filter function to this instance and return the resulting
-   * {@code PTable}. Designed for Java lambdas.
-   */
-  PTable<K, V> filter(IFilterFn<Pair<K, V>> fn);
-
-  /**
-   * Apply the given filter function to this instance and return the resulting
    * {@code PTable}.
    *
    * @param name
@@ -165,12 +147,6 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
   PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
 
   /**
-   * Apply the given filter function to this instance and return the resulting
-   * {@code PTable}. Designed for Java lambdas.
-   */
-  PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> fn);
-
-  /**
    * Returns a PTable made up of the pairs in this PTable with the largest value
    * field.
    *

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java b/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
deleted file mode 100644
index 8560fab..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/fn/IFnHelpers.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.fn;
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
-import org.apache.crunch.FilterFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IMapFn;
-import org.apache.crunch.MapFn;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-
-public class IFnHelpers {
-
-  public static <S, T> DoFn<S, T> wrap(final org.apache.crunch.IDoFn<S, T> fn) {
-    return new IDoFnWrapper(fn);
-  }
-
-  public static <S, T> DoFn<S, T> wrapFlatMap(final IFlatMapFn<S, T> fn) {
-    return new DoFn<S, T>() {
-      @Override
-      public void process(S input, Emitter<T> emitter) {
-        for (T t : fn.process(input)) {
-          emitter.emit(t);
-        }
-      }
-    };
-  }
-
-  public static <S, T> MapFn<S, T> wrapMap(final IMapFn<S, T> fn) {
-    return new MapFn<S, T>() {
-      @Override
-      public T map(S input) {
-        return fn.map(input);
-      }
-    };
-  }
-
-  public static <S> FilterFn<S> wrapFilter(final IFilterFn<S> fn) {
-    return new FilterFn<S>() {
-      @Override
-      public boolean accept(S input) {
-        return fn.accept(input);
-      }
-    };
-  }
-
-  static class IDoFnWrapper<S, T> extends DoFn<S, T> {
-
-    private final org.apache.crunch.IDoFn<S, T> fn;
-    private transient ContextImpl<S, T> ctxt;
-
-    public IDoFnWrapper(org.apache.crunch.IDoFn<S, T> fn) {
-      this.fn = fn;
-    }
-
-    @Override
-    public void initialize() {
-      super.initialize();
-      if (getContext() == null) {
-        this.ctxt = new ContextImpl<S, T>(getConfiguration());
-      } else {
-        this.ctxt = new ContextImpl<S, T>(getContext());
-      }
-    }
-
-    @Override
-    public void process(S input, Emitter<T> emitter) {
-      fn.process(ctxt.update(input, emitter));
-    }
-  }
-
-  static class ContextImpl<S, T> implements IDoFn.Context<S, T> {
-    private S element;
-    private Emitter<T> emitter;
-    private TaskInputOutputContext context;
-    private Configuration conf;
-
-    public ContextImpl(TaskInputOutputContext context) {
-      this.context = context;
-      this.conf = context.getConfiguration();
-    }
-
-    public ContextImpl(Configuration conf) {
-      this.context = null;
-      this.conf = conf;
-    }
-
-    public ContextImpl update(S element, Emitter<T> emitter) {
-      this.element = element;
-      this.emitter = emitter;
-      return this;
-    }
-
-    public S element() {
-      return element;
-    }
-
-    public void emit(T t) {
-      emitter.emit(t);
-    }
-
-    public TaskInputOutputContext getContext() {
-      return context;
-    }
-
-    public Configuration getConfiguration() {
-      return conf;
-    }
-
-    public void increment(String groupName, String counterName) {
-      increment(groupName, counterName, 1);
-    }
-
-    public void increment(String groupName, String counterName, long value) {
-      if (context != null) {
-        context.getCounter(groupName, counterName).increment(value);
-      }
-    }
-
-    public void increment(Enum<?> counterName) {
-      increment(counterName, 1);
-    }
-
-    public void increment(Enum<?> counterName, long value) {
-      if (context != null) {
-        context.getCounter(counterName).increment(value);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
index eb2d829..7bfacdf 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
@@ -25,16 +25,13 @@ import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PType;
@@ -121,12 +118,6 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>
   }
 
   @Override
-  public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-
-  @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index 2a5e1f5..7650ff5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -25,10 +25,6 @@ import org.apache.crunch.CachingOptions;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -40,7 +36,6 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.io.ReadableSource;
@@ -145,7 +140,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   @Override
-  public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
+  public <T> PCollection<T>  parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
     return parallelDo(name, fn, type, ParallelDoOptions.builder().build());
   }
 
@@ -171,86 +166,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     return pipeline.getFactory().createDoTable(name, getChainingCollection(), fn, type, options);
   }
 
-  @Override
-  public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public PCollection<S> filter(IFilterFn<S> fn) {
-    return filter(IFnHelpers.wrapFilter(fn));
-  }
-
-  @Override
-  public PCollection<S> filter(String name, IFilterFn<S> fn) {
-    return filter(name, IFnHelpers.wrapFilter(fn));
-  }
-
   public PCollection<S> write(Target target) {
     if (materializedAt != null) {
       getPipeline().write(
@@ -355,11 +270,6 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   @Override
-  public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
index 6bc3a41..4ba4d49 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PTableBase.java
@@ -21,8 +21,6 @@ import com.google.common.collect.Lists;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -31,7 +29,6 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.impl.dist.DistributedPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
@@ -135,24 +132,10 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   }
 
   @Override
-  public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
-  public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(this, mapFn, ptype);
   }
 
-  @Override
-  public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
 
   @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
@@ -165,11 +148,6 @@ public abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> imple
   }
 
   @Override
-  public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) {
-    return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
     return PTables.mapKeys(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 89671a3..087a31d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Set;
@@ -36,10 +35,6 @@ import org.apache.crunch.Aggregator;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.FilterFn;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IFlatMapFn;
-import org.apache.crunch.IDoFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -51,7 +46,6 @@ import org.apache.crunch.ReadableData;
 import org.apache.crunch.PipelineCallable;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.ExtractKeyFn;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
 import org.apache.crunch.lib.Aggregate;
@@ -205,84 +199,6 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public <T> PCollection<T> parallelDo(IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(null, fn, type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(null, fn, type);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, fn, type, null);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, fn, type, null);
-  }
-
-  @Override
-  public <T> PCollection<T> parallelDo(String name, IDoFn<S, T> fn, PType<T> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> parallelDo(String name, IDoFn<S, Pair<K, V>> fn, PTableType<K, V> type, ParallelDoOptions options) {
-    return parallelDo(name, IFnHelpers.wrap(fn), type, options);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> flatMap(String name, IFlatMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> flatMap(String name, IFlatMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapFlatMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <T> PCollection<T> map(String name, IMapFn<S, T> fn, PType<T> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  @Override
-  public <K, V> PTable<K, V> map(String name, IMapFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return parallelDo(name, IFnHelpers.wrapMap(fn), type);
-  }
-
-  public PCollection<S> filter(IFilterFn<S> fn) {
-    return filter(IFnHelpers.wrapFilter(fn));
-  }
-
-  public PCollection<S> filter(String name, IFilterFn<S> fn) {
-    return filter(name, IFnHelpers.wrapFilter(fn));
-  }
-
-  @Override
   public PCollection<S> write(Target target) {
     getPipeline().write(this, target);
     return this;
@@ -404,11 +320,6 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public <K> PTable<K, S> by(IMapFn<S, K> mapFn, PType<K> keyType) {
-    return parallelDo(new ExtractKeyFn<K, S>(IFnHelpers.wrapMap(mapFn)), getTypeFamily().tableOf(keyType, getPType()));
-  }
-
-  @Override
   public <K> PTable<K, S> by(String name, MapFn<S, K> mapFn, PType<K> keyType) {
     return parallelDo(name, new ExtractKeyFn<K, S>(mapFn), getTypeFamily().tableOf(keyType, getPType()));
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 5451533..e8bf5e6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -22,7 +22,6 @@ import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
@@ -30,7 +29,6 @@ import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Target;
 import org.apache.crunch.fn.Aggregators;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.PTables;
 import org.apache.crunch.types.PGroupedTableType;
 import org.apache.crunch.types.PTableType;
@@ -125,11 +123,6 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
   }
 
   @Override
-  public <U> PTable<K, U> mapValues(IMapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 03b5a70..b90b656 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -24,8 +24,6 @@ import com.google.common.collect.ImmutableList;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
-import org.apache.crunch.IFilterFn;
-import org.apache.crunch.IMapFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
@@ -33,7 +31,6 @@ import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Target;
-import org.apache.crunch.fn.IFnHelpers;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
 import org.apache.crunch.lib.Join;
@@ -138,26 +135,11 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   }
 
   @Override
-  public PTable<K, V> filter(IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
-  public PTable<K, V> filter(String name, IFilterFn<Pair<K, V>> filterFn) {
-    return parallelDo(name, IFnHelpers.wrapFilter(filterFn), getPTableType());
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(MapFn<V, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(this, mapFn, ptype);
   }
 
   @Override
-  public <U> PTable<K, U> mapValues(IMapFn<V, U> mapFn, PType<U> ptype) {
-    return PTables.mapValues(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <U> PTable<K, U> mapValues(String name, MapFn<V, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(name, this, mapFn, ptype);
   }
@@ -168,11 +150,6 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   }
 
   @Override
-  public <K2> PTable<K2, V> mapKeys(IMapFn<K, K2> mapFn, PType<K2> ptype) {
-    return PTables.mapKeys(this, IFnHelpers.wrapMap(mapFn), ptype);
-  }
-
-  @Override
   public <K2> PTable<K2, V> mapKeys(String name, MapFn<K, K2> mapFn, PType<K2> ptype) {
     return PTables.mapKeys(name, this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-dist/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-dist/pom.xml b/crunch-dist/pom.xml
index b7cd0e9..48d9b05 100644
--- a/crunch-dist/pom.xml
+++ b/crunch-dist/pom.xml
@@ -61,6 +61,10 @@ under the License.
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch-contrib</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-lambda</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-lambda/pom.xml b/crunch-lambda/pom.xml
new file mode 100644
index 0000000..e910517
--- /dev/null
+++ b/crunch-lambda/pom.xml
@@ -0,0 +1,67 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-parent</artifactId>
+        <version>0.14.0-SNAPSHOT</version>
+    </parent>
+    <properties>
+        <java.source.version>1.8</java.source.version>
+        <java.target.version>1.8</java.target.version>
+    </properties>
+
+    <artifactId>crunch-lambda</artifactId>
+    <name>Apache Crunch Lambda</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.crunch</groupId>
+            <artifactId>crunch-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.crunch</groupId>
+            <artifactId>crunch-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
new file mode 100644
index 0000000..5b8611d
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LAggregator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lambda.fn.SBiFunction;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SSupplier;
+
+/**
+ * Crunch Aggregator expressed as a composition of functional interface implementations
+ * @param <V> Type of values to be aggregated
+ * @param <A> Type of object which stores objects as they are being aggregated
+ */
+public class LAggregator<V, A> extends Aggregators.SimpleAggregator<V> {
+
+    private final SSupplier<A> initialSupplier;
+    private final SBiFunction<A, V, A> combineFn;
+    private final SFunction<A, Iterable<V>> outputFn;
+    private A a;
+
+    public LAggregator(SSupplier<A> initialSupplier, SBiFunction<A, V, A> combineFn, SFunction<A, Iterable<V>> outputFn) {
+        this.initialSupplier = initialSupplier;
+        this.combineFn = combineFn;
+        this.outputFn = outputFn;
+    }
+
+    @Override
+    public void reset() {
+        a = initialSupplier.get();
+    }
+
+    @Override
+    public void update(V v) {
+        a = combineFn.apply(a, v);
+    }
+
+    @Override
+    public Iterable<V> results() {
+        return outputFn.apply(a);
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
new file mode 100644
index 0000000..6a8dd62
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.*;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SPredicate;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Java 8 friendly version of the {@link PCollection} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <S> The type of the elements in this collection
+ */
+public interface LCollection<S> {
+    /**
+     * Get the underlying {@link PCollection} for this LCollection
+     */
+    PCollection<S> underlying();
+
+    /**
+     * Get the {@link LCollectionFactory} which can be used to create new Ltype instances
+     */
+    LCollectionFactory factory();
+
+    /**
+     * Transform this LCollection using a standard Crunch {@link DoFn}
+     */
+    default <T> LCollection<T> parallelDo(DoFn<S, T> fn, PType<T> pType) {
+        return factory().wrap(underlying().parallelDo(fn, pType));
+    }
+
+    /**
+     * Transform this LCollection to an {@link LTable} using a standard Crunch {@link DoFn}
+     */
+    default <K, V> LTable<K, V> parallelDo(DoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+        return factory().wrap(underlying().parallelDo(fn, pType));
+    }
+
+    /**
+     * Transform this LCollection using a Lambda-friendly {@link LDoFn}.
+     */
+    default <T> LCollection<T> parallelDo(LDoFn<S, T> fn, PType<T> pType) {
+        return parallelDo(new LDoFnWrapper<>(fn), pType);
+    }
+
+    /**
+     * Transform this LCollection using a Lambda-friendly {@link LDoFn}.
+     */
+    default <K, V> LTable<K, V> parallelDo(LDoFn<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+        return parallelDo(new LDoFnWrapper<>(fn), pType);
+    }
+
+    /**
+     * Map the elements of this collection 1-1 through the supplied function.
+     */
+    default <T> LCollection<T> map(SFunction<S, T> fn, PType<T> pType) {
+        return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
+    }
+
+    /**
+     * Map the elements of this collection 1-1 through the supplied function to yield an {@link LTable}
+     */
+    default <K, V> LTable<K, V> map(SFunction<S, Pair<K, V>> fn, PTableType<K, V> pType) {
+        return parallelDo(ctx -> ctx.emit(fn.apply(ctx.element())), pType);
+    }
+
+    /**
+     * Map each element to zero or more output elements using the provided stream-returning function.
+     */
+    default <T> LCollection<T> flatMap(SFunction<S, Stream<T>> fn, PType<T> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
+    }
+
+    /**
+     * Map each element to zero or more output elements using the provided stream-returning function to yield an
+     * {@link LTable}
+     */
+    default <K, V> LTable<K, V> flatMap(SFunction<S, Stream<Pair<K, V>>> fn, PTableType<K, V> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).forEach(ctx::emit), pType);
+    }
+
+    /**
+     * Combination of a filter and map operation by using a function with {@link Optional} return type.
+     */
+    default <T> LCollection<T> filterMap(SFunction<S, Optional<T>> fn, PType<T> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
+    }
+
+    /**
+     * Combination of a filter and map operation by using a function with {@link Optional} return type.
+     */
+    default <K, V> LTable<K, V> filterMap(SFunction<S, Optional<Pair<K, V>>> fn, PTableType<K, V> pType) {
+        return parallelDo(ctx -> fn.apply(ctx.element()).ifPresent(ctx::emit), pType);
+    }
+
+    /**
+     * Filter the collection using the supplied predicate.
+     */
+    default LCollection<S> filter(SPredicate<S> predicate) {
+        return parallelDo(ctx -> { if (predicate.test(ctx.element())) ctx.emit(ctx.element());}, pType());
+    }
+
+    /**
+     * Union this LCollection with another LCollection of the same type
+     */
+    default LCollection<S> union(LCollection<S> other) {
+        return factory().wrap(underlying().union(other.underlying()));
+    }
+
+    /**
+     * Union this LCollection with a {@link PCollection} of the same type
+     */
+    default LCollection<S> union(PCollection<S> other) {
+        return factory().wrap(underlying().union(other));
+    }
+
+    /**
+     * Increment a counter for every element in the collection
+     */
+    default LCollection<S> increment(Enum<?> counter) {
+        return parallelDo(ctx -> ctx.increment(counter), pType());
+    }
+
+    /**
+     * Increment a counter for every element in the collection
+     */
+    default LCollection<S> increment(String counterGroup, String counterName) {
+        return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+    }
+
+    /**
+     * Increment a counter for every element satisfying the conditional predicate supplied.
+     */
+    default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counter);
+        }, pType());
+    }
+
+    /**
+     * Increment a counter for every element satisfying the conditional predicate supplied.
+     */
+    default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+        }, pType());
+    }
+
+    /**
+     * Cache the underlying {@link PCollection}
+     */
+    default LCollection<S> cache() {
+        underlying().cache();
+        return this;
+    }
+
+    /**
+     * Cache the underlying {@link PCollection}
+     */
+    default LCollection<S> cache(CachingOptions options) {
+        underlying().cache(options);
+        return this;
+    }
+
+    /**
+     * Key this LCollection by a key extracted from the element to yield a {@link LTable} mapping the key to the whole
+     * element.
+     */
+    default <K> LTable<K, S> by(SFunction<S, K> extractFn, PType<K> pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(extractFn.apply(ctx.element()), ctx.element())),
+                ptf().tableOf(pType, pType()));
+    }
+
+    /**
+     * Count distict values in this LCollection, yielding an {@link LTable} mapping each value to the number
+     * of occurrences in the collection.
+     */
+    default LTable<S, Long> count() {
+        return map(a -> Pair.of(a, 1L), ptf().tableOf(pType(), ptf().longs()))
+                .groupByKey()
+                .combineValues(Aggregators.SUM_LONGS());
+    }
+
+    /**
+     * Obtain the contents of this LCollection as a {@link Stream} that can be processed locally. Note, this may trigger
+     * your job to execute in a distributed environment if the pipeline has not yet been run.
+     */
+    default Stream<S> materialize() {
+        return StreamSupport.stream(underlying().materialize().spliterator(), false);
+    }
+
+    /**
+     * Get the {@link PTypeFamily} representing how elements of this collection may be serialized.
+     */
+    default PTypeFamily ptf() {
+        return underlying().getPType().getFamily();
+    }
+
+    /**
+     * Get the {@link PType} representing how elements of this collection may be serialized.
+     */
+    default PType<S> pType() { return underlying().getPType(); }
+
+    /**
+     * Write this collection to the specified {@link Target}
+     */
+    default LCollection<S> write(Target target) {
+        underlying().write(target);
+        return this;
+    }
+
+    /**
+     * Write this collection to the specified {@link Target} with the given {@link org.apache.crunch.Target.WriteMode}
+     */
+    default LCollection<S> write(Target target, Target.WriteMode writeMode) {
+        underlying().write(target, writeMode);
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
new file mode 100644
index 0000000..4b208d2
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+/**
+ * Factory for creating {@link LCollection}, {@link LTable} and {@link LGroupedTable} objects from their corresponding
+ * {@link PCollection}, {@link PTable} and {@link PGroupedTable} types. You probably don't want to use or implement this
+ * interface directly. You should start with the {@link Lambda} class instead.
+ */
+public interface LCollectionFactory {
+    /**
+     * Wrap a PCollection into an LCollection
+     */
+    <S> LCollection<S> wrap(PCollection<S> collection);
+
+    /**
+     * Wrap a PTable into an LTable
+     */
+    <K, V> LTable<K, V> wrap(PTable<K, V> collection);
+
+    /**
+     * Wrap a PGroupedTable into an LGroupedTable
+     */
+    <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
new file mode 100644
index 0000000..4bedfa7
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollectionFactoryImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+class LCollectionFactoryImpl implements LCollectionFactory {
+
+    @Override
+    public <S> LCollection<S> wrap(final PCollection<S> collection) {
+        return new LCollection<S>() {
+            @Override
+            public PCollection<S> underlying() {
+                return collection;
+            }
+
+            @Override
+            public LCollectionFactory factory() {
+                return LCollectionFactoryImpl.this;
+            }
+        };
+    }
+
+    @Override
+    public <K, V> LTable<K, V> wrap(final PTable<K, V> collection) {
+        return new LTable<K, V>() {
+            @Override
+            public PTable<K, V> underlying() {
+                return collection;
+            }
+
+            @Override
+            public LCollectionFactory factory() {
+                return LCollectionFactoryImpl.this;
+            }
+        };
+    }
+
+    @Override
+    public <K, V> LGroupedTable<K, V> wrap(final PGroupedTable<K, V> collection) {
+        return new LGroupedTable<K, V>() {
+            @Override
+            public PGroupedTable<K, V> underlying() {
+                return collection;
+            }
+
+            @Override
+            public LCollectionFactory factory() {
+                return LCollectionFactoryImpl.this;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
new file mode 100644
index 0000000..1be8085
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFn.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.DoFn;
+
+import java.io.Serializable;
+
+/**
+ * A Java lambdas friendly version of the {@link DoFn} class.
+ */
+public interface LDoFn<S, T> extends Serializable {
+
+  void process(LDoFnContext<S, T> context);
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
new file mode 100644
index 0000000..3743a2f
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * Context object for implementing distributed operations in terms of Lambda expressions.
+ * @param <S> Input type of LDoFn
+ * @param <T> Output type of LDoFn
+ */
+public interface LDoFnContext<S, T> {
+    /** Get the input element */
+    S element();
+
+    /** Emit t to the output */
+    void emit(T t);
+
+    /** Get the underlying {@link TaskInputOutputContext} (for special cases) */
+    TaskInputOutputContext getContext();
+
+    /** Get the current Hadoop {@link Configuration} */
+    Configuration getConfiguration();
+
+    /** Increment a counter by 1 */
+    void increment(String groupName, String counterName);
+
+    /** Increment a counter by value */
+    void increment(String groupName, String counterName, long value);
+
+    /** Increment a counter by 1 */
+    void increment(Enum<?> counterName);
+
+    /** Increment a counter by value */
+    void increment(Enum<?> counterName, long value);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
new file mode 100644
index 0000000..76087d5
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LDoFnWrapper.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+class LDoFnWrapper<S, T> extends DoFn<S, T> {
+
+    private final LDoFn<S, T> fn;
+    private transient Context<S, T> ctxt;
+
+    public LDoFnWrapper(LDoFn<S, T> fn) {
+        this.fn = fn;
+    }
+
+    @Override
+    public void initialize() {
+        super.initialize();
+        if (getContext() == null) {
+            this.ctxt = new Context<>(getConfiguration());
+        } else {
+            this.ctxt = new Context<>(getContext());
+        }
+    }
+
+    @Override
+    public void process(S input, Emitter<T> emitter) {
+        fn.process(ctxt.update(input, emitter));
+    }
+    static class Context<S, T> implements LDoFnContext<S, T> {
+        private S element;
+        private Emitter<T> emitter;
+        private TaskInputOutputContext context;
+        private Configuration conf;
+
+        public Context(TaskInputOutputContext context) {
+            this.context = context;
+            this.conf = context.getConfiguration();
+        }
+
+        public Context(Configuration conf) {
+            this.context = null;
+            this.conf = conf;
+        }
+
+        public Context<S, T> update(S element, Emitter<T> emitter) {
+            this.element = element;
+            this.emitter = emitter;
+            return this;
+        }
+
+        public S element() {
+            return element;
+        }
+
+        public void emit(T t) {
+            emitter.emit(t);
+        }
+
+        public TaskInputOutputContext getContext() {
+            return context;
+        }
+
+        public Configuration getConfiguration() {
+            return conf;
+        }
+
+        public void increment(String groupName, String counterName) {
+            increment(groupName, counterName, 1);
+        }
+
+        public void increment(String groupName, String counterName, long value) {
+            if (context != null) {
+                context.getCounter(groupName, counterName).increment(value);
+            }
+        }
+
+        public void increment(Enum<?> counterName) {
+            increment(counterName, 1);
+        }
+
+        public void increment(Enum<?> counterName, long value) {
+            if (context != null) {
+                context.getCounter(counterName).increment(value);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
new file mode 100644
index 0000000..10209e0
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LGroupedTable.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.Aggregator;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.lambda.fn.SBiConsumer;
+import org.apache.crunch.lambda.fn.SBiFunction;
+import org.apache.crunch.lambda.fn.SBinaryOperator;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SSupplier;
+import org.apache.crunch.types.PType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+/**
+ * Java 8 friendly version of the {@link PGroupedTable} interface, allowing distributed operations to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation for each operation.
+ * @param <K> key type for this table
+ * @param <V> value type for this table
+ */
+public interface LGroupedTable<K, V> extends LCollection<Pair<K, Iterable<V>>>  {
+    /**
+     * Get the underlying {@link PGroupedTable} for this LGroupedTable
+     */
+    PGroupedTable<K, V> underlying();
+
+    /**
+     * Combine the value part of the table using the provided Crunch {@link Aggregator}. This will be optimised into
+     * both a combine and reduce in the MapReduce implementation, with similar optimisations available for other
+     * implementations.
+     */
+    default LTable<K, V> combineValues(Aggregator<V> aggregator) {
+        return factory().wrap(underlying().combineValues(aggregator));
+    }
+
+    /**
+     * Combine the value part of the table using the given functions. The supplier is used to create a new aggregating
+     * type, the combineFn adds a value into the aggregate, and the output function transforms the aggregate into
+     * an iterable of the original value type. For example, summation can be expressed as follows:
+     *
+     * <pre>{@code myGroupedTable.combineValues(() -> 0, (sum, value) -> sum + value, Collections::singleton) }</pre>
+     *
+     * <p>This will be optimised into both a combine and reduce in the MapReduce implementation, with similar
+     * optimizations *available for other implementations.</p>
+     */
+    default <A> LTable<K, V> combineValues(
+            SSupplier<A> initialSupplier,
+            SBiFunction<A, V, A> combineFn,
+            SFunction<A, Iterable<V>> outputFn) {
+        return combineValues(new LAggregator<>(initialSupplier, combineFn, outputFn));
+    }
+
+    /**
+     * Map the values in this LGroupedTable using a custom function. This function operates over a stream which can
+     * be consumed only once.
+     *
+     * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may
+     * in fact get given the same object multiple times with different data as you consume the stream, meaning it may
+     * be necessary to detach values.</p>
+     */
+    default <T> LTable<K, T> mapValues(SFunction<Stream<V>, T> fn, PType<T> pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(
+                        ctx.element().first(),
+                        fn.apply(StreamSupport.stream(ctx.element().second().spliterator(), false)))
+                ), ptf().tableOf(keyType(), pType));
+    }
+
+    /**
+     * Collect the values into an aggregate type. This differs from combineValues in that it outputs the aggregate type
+     * rather than the value type, and is designed to happen in one step (rather than being optimised into multiple
+     * levels). This makes it much more suitable for assembling collections than computing simple numeric aggregates.
+     *
+     * <p>The supplier provides an "empty" object, then the consumer is called with each value. For example, to collect
+     * all values into a {@link Collection}, one can do this:</p>
+     * <pre>{@code
+     * lgt.collectValues(ArrayList::new, Collection::add, lgt.ptf().collections(lgt.valueType()))
+     * }</pre>
+     *
+     * <p>This is in fact the default implementation for the collectAllValues() method.</p>
+     *
+     * <p>Note that in serialization systems which heavily reuse objects (such as Avro), you may
+     * in fact get given the same object multiple times with different data as you consume the stream, meaning it may
+     * be necessary to detach values.</p>
+     */
+    default <C> LTable<K, C> collectValues(SSupplier<C> emptySupplier, SBiConsumer<C, V> addFn, PType<C> pType) {
+        return parallelDo(ctx -> {
+            C coll = emptySupplier.get();
+            ctx.element().second().forEach(v -> addFn.accept(coll, v));
+            ctx.emit(Pair.of(ctx.element().first(), coll));
+        }, ptf().tableOf(keyType(), pType));
+    }
+
+    /**
+     * Collect all values for each key into a {@link Collection}
+     */
+    default LTable<K, Collection<V>> collectAllValues() {
+        return collectValues(ArrayList::new, Collection::add, ptf().collections(valueType()));
+    }
+
+    /**
+     * Collect all unique values for each key into a {@link Collection} (note that the value type must have a correctly-
+     * defined equals() and hashcode().
+     */
+    default LTable<K, Collection<V>> collectUniqueValues() {
+        return collectValues(HashSet::new, Collection::add, ptf().collections(valueType()));
+    }
+
+    /**
+     * Reduce the values for each key using the an associative binary operator.
+     * For example {@code reduceValues((a, b) -> a + b)} for summation, {@code reduceValues((a, b) -> a + ", " + b}
+     * for comma-separated string concatenation and {@code reduceValues((a, b) -> a > b ? a : b} for maximum value.
+     */
+    default LTable<K, V> reduceValues(SBinaryOperator<V> operator) {
+        return combineValues(() -> (V)null, (a, b) -> a == null ? b : operator.apply(a, b), Collections::singleton);
+    }
+
+    /**
+     * Ungroup this LGroupedTable back into an {@link LTable}. This will still trigger a "reduce" operation, so is
+     * usually only used in special cases like producing a globally-ordered list by feeding the everything through
+     * a single reducers.
+     */
+    default LTable<K, V> ungroup() {
+        return factory().wrap(underlying().ungroup());
+    }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the key part of this grouped table
+     */
+    default PType<K> keyType() {
+        return underlying().getGroupedTableType().getTableType().getKeyType();
+    }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the value part of this grouped table
+     */
+    default PType<V> valueType() {
+        return underlying().getGroupedTableType().getTableType().getValueType();
+    }
+
+}


Mime
View raw message