crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject [1/2] crunch git commit: Java 8 lambda support for Apache Crunch.
Date Tue, 12 Jan 2016 23:05:20 GMT
Repository: crunch
Updated Branches:
  refs/heads/master f8920d355 -> 7d7af4ef4


http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
new file mode 100644
index 0000000..0b4e4fa
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.GroupingOptions;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Target;
+import org.apache.crunch.lambda.fn.SFunction;
+import org.apache.crunch.lambda.fn.SPredicate;
+import org.apache.crunch.lib.join.DefaultJoinStrategy;
+import org.apache.crunch.lib.join.JoinStrategy;
+import org.apache.crunch.lib.join.JoinType;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.Collection;
+
+/**
+ * Java 8 friendly version of the {@link PTable} interface, allowing distributed operations
to be expressed in
+ * terms of lambda expressions and method references, instead of creating a new class implementation
for each operation.
+ * @param <K> key type for this table
+ * @param <V> value type for this table
+ */
+public interface LTable<K, V> extends LCollection<Pair<K, V>> {
+    /**
+     * Get the underlying {@link PTable} for this LCollection
+     */
+    PTable<K, V> underlying();
+
+    /**
+     * Group this table by key to yield a {@link LGroupedTable}
+     */
+    default LGroupedTable<K, V> groupByKey() {
+        return factory().wrap(underlying().groupByKey());
+    }
+
+    /**
+     * Group this table by key to yield a {@link LGroupedTable}
+     */
+    default LGroupedTable<K, V> groupByKey(int numReducers) {
+        return factory().wrap(underlying().groupByKey(numReducers));
+    }
+
+    /**
+     * Group this table by key to yield a {@link LGroupedTable}
+     */
+    default LGroupedTable<K, V> groupByKey(GroupingOptions opts) {
+        return factory().wrap(underlying().groupByKey(opts));
+    }
+
+    /**
+     * Get an {@link LCollection} containing just the keys from this table
+     */
+    default LCollection<K> keys() {
+        return factory().wrap(underlying().keys());
+    }
+
+    /**
+     * Get an {@link LCollection} containing just the values from this table
+     */
+    default LCollection<V> values() {
+        return factory().wrap(underlying().values());
+    }
+
+    /**
+     * Transform the keys of this table using the given function
+     */
+    default <T> LTable<T, V> mapKeys(SFunction<K, T> fn, PType<T>
pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(fn.apply(ctx.element().first()), ctx.element().second())),
+                ptf().tableOf(pType, valueType()));
+    }
+
+    /**
+     * Transform the values of this table using the given function
+     */
+    default <T> LTable<K, T> mapValues(SFunction<V, T> fn, PType<T>
pType) {
+        return parallelDo(
+                ctx -> ctx.emit(Pair.of(ctx.element().first(), fn.apply(ctx.element().second()))),
+                ptf().tableOf(keyType(), pType));
+    }
+
+    /**
+     * Join this table to another {@link LTable} which has the same key type using the provided
{@link JoinType} and
+     * {@link JoinStrategy}
+     */
+    default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType
joinType, JoinStrategy<K, V, U> joinStrategy) {
+        return factory().wrap(joinStrategy.join(underlying(), other.underlying(), joinType));
+    }
+
+    /**
+     * Join this table to another {@link LTable} which has the same key type using the provide
{@link JoinType} and
+     * the {@link DefaultJoinStrategy} (reduce-side join).
+     */
+    default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other, JoinType
joinType) {
+        return join(other, joinType, new DefaultJoinStrategy<>());
+    }
+
+    /**
+     * Inner join this table to another {@link LTable} which has the same key type using
a reduce-side join
+     */
+    default <U> LTable<K, Pair<V, U>> join(LTable<K, U> other) {
+        return join(other, JoinType.INNER_JOIN);
+    }
+
+    /**
+     * Cogroup this table with another {@link LTable} with the same key type, collecting
the set of values from
+     * each side.
+     */
+    default <U> LTable<K, Pair<Collection<V>, Collection<U>>>
cogroup(LTable<K, U> other) {
+        return factory().wrap(underlying().cogroup(other.underlying()));
+    }
+
+    /**
+     * Get the underlying {@link PTableType} used to serialize key/value pairs in this table
+     */
+    default PTableType<K, V> pType() { return underlying().getPTableType(); }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the key part of this table
+     */
+    default PType<K> keyType() {
+        return underlying().getKeyType();
+    }
+
+    /**
+     * Get a {@link PType} which can be used to serialize the value part of this table
+     */
+    default PType<V> valueType() {
+        return underlying().getValueType();
+    }
+
+    /**
+     * Write this table to the {@link Target} supplied.
+     */
+    default LTable<K, V> write(Target target) {
+        underlying().write(target);
+        return this;
+    }
+
+    /**
+     * Write this table to the {@link Target} supplied.
+     */
+    default LTable<K, V> write(Target target, Target.WriteMode writeMode) {
+        underlying().write(target, writeMode);
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> increment(Enum<?> counter) {
+        return parallelDo(ctx -> ctx.increment(counter), pType());
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> increment(String counterGroup, String counterName) {
+        return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> incrementIf(Enum<?> counter, SPredicate<Pair<K,
V>> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counter);
+        }, pType());
+    }
+
+    /** {@inheritDoc} */
+    default LTable<K, V> incrementIf(String counterGroup, String counterName, SPredicate<Pair<K,
V>> condition) {
+        return parallelDo(ctx -> {
+            if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+        }, pType());
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
new file mode 100644
index 0000000..07fad2b
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/Lambda.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+
+/**
+ * Entry point for the crunch-lambda API. Use this to create {@link LCollection}, {@link
LTable} and
+ * {@link LGroupedTable} objects from their corresponding {@link PCollection}, {@link PTable}
and {@link PGroupedTable}
+ * types.
+ *
+ * <p>The crunch-lambda API allows you to write Crunch pipelines using lambda expressions
and method references instead
+ * of creating classes (anonymous, inner, or top level) for each operation that needs to
be completed. Many pipelines
+ * are composed of a large number of simple operations, rather than a small number of complex
operations, making this
+ * strategy much more efficient to code and easy to read for those able to use Java 8 in
their distributed computation
+ * environments.</p>
+ *
+ * <p>You use the API by wrapping your Crunch type into an L-type object. This class
provides static methods for that.
+ * You can then use the lambda API methods on the L-type object, yielding more L-type objects.
If at any point you need
+ * to go back to the standard Crunch world (for compatibility with existing code or complex
use cases), you can at any
+ * time call underlying() on an L-type object to get a Crunch object</p>
+ *
+ * <p>Example (the obligatory wordcount):</p>
+ *
+ * <pre>{@code
+ * Pipeline pipeline = new MRPipeline(getClass());
+ * LCollection<String> inputText = Lambda.wrap(pipeline.readTextFile("/path/to/input/file"));
+ * inputText.flatMap(line -> Arrays.stream(line.split(" ")), Writables.strings())
+ *          .count()
+ *          .map(wordCountPair -> wordCountPair.first() + ": " + wordCountPair.second(),
strings())
+ *          .write(To.textFile("/path/to/output/file"));
+ * pipeline.run();
+ * }</pre>
+ *
+ */
+public class Lambda {
+    private static LCollectionFactory INSTANCE = new LCollectionFactoryImpl();
+
+    public static <S> LCollection<S> wrap(PCollection<S> collection) {
return INSTANCE.wrap(collection); }
+    public static <K, V> LTable<K, V> wrap(PTable<K, V> collection) { return
INSTANCE.wrap(collection); }
+    public static <K, V> LGroupedTable<K, V> wrap(PGroupedTable<K, V> collection)
{ return INSTANCE.wrap(collection); }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
new file mode 100644
index 0000000..6e5030f
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiConsumer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BiConsumer;
+
+/**
+ * Serializable version of the Java BiConsumer functional interface.
+ */
+@FunctionalInterface
+public interface SBiConsumer<K, V> extends BiConsumer<K, V>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
new file mode 100644
index 0000000..5aac5bc
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBiFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BiFunction;
+
+/**
+ * Serializable version of the Java BiFunction functional interface.
+ */
+@FunctionalInterface
+public interface SBiFunction<K, V, T> extends BiFunction<K, V, T>, Serializable
{
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
new file mode 100644
index 0000000..d1e4cc0
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SBinaryOperator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.BinaryOperator;
+
+/**
+ * Serializable version of the Java BinaryOperator functional interface.
+ */
+@FunctionalInterface
+public interface SBinaryOperator<T> extends BinaryOperator<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
new file mode 100644
index 0000000..90f4a99
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SConsumer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/**
+ * Serializable version of the Java Consumer functional interface.
+ */
+@FunctionalInterface
+public interface SConsumer<T> extends Consumer<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
new file mode 100644
index 0000000..d120efe
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SFunction.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Serializable version of the Java Function functional interface.
+ */
+@FunctionalInterface
+public interface SFunction<S, T> extends Function<S, T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
new file mode 100644
index 0000000..9e90bab
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SPredicate.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Predicate;
+
+/**
+ * Serializable version of the Java Predicate functional interface.
+ */
+@FunctionalInterface
+public interface SPredicate<T> extends Predicate<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
new file mode 100644
index 0000000..eea254a
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/SSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda.fn;
+
+import java.io.Serializable;
+import java.util.function.Supplier;
+
+/**
+ * Serializable version of the Java Supplier functional interface.
+ */
+@FunctionalInterface
+public interface SSupplier<T> extends Supplier<T>, Serializable {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
new file mode 100644
index 0000000..ad18232
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/fn/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Serializable versions of the functional interfaces that ship with Java 8
+ */
+package org.apache.crunch.lambda.fn;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
new file mode 100644
index 0000000..9c03148
--- /dev/null
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/package-info.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <p>Alternative Crunch API using Java 8 features to allow construction of pipelines
using lambda functions and method
+ * references. It works by wrapping standards Java {@link org.apache.crunch.PCollection},
+ * {@link org.apache.crunch.PTable} and {@link org.apache.crunch.PGroupedTable} instances
into the corresponding
+ * {@link org.apache.crunch.lambda.LCollection}, {@link org.apache.crunch.lambda.LTable}
and
+ * {@link org.apache.crunch.lambda.LGroupedTable classes}.</p>
+ *
+ * <p>The static class {@link org.apache.crunch.lambda.Lambda} has methods to create
these. Please also see the Javadocs
+ * for {@link org.apache.crunch.lambda.Lambda} for usage examples</p>
+ */
+package org.apache.crunch.lambda;
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
new file mode 100644
index 0000000..b819d0d
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LCollectionTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.crunch.lambda.TestCommon.*;
+import static org.apache.crunch.lambda.TypedRecord.rec;
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.*;
+
+public class LCollectionTest {
+
+    private LCollection<TypedRecord> lc() {
+        return Lambda.wrap(MemPipeline.typedCollectionOf(Avros.reflects(TypedRecord.class),
+                rec(14, "Alice", 101L),
+                rec(25, "Bo B", 102L),
+                rec(21, "Char Lotte", 103L),
+                rec(28, "David", 104L),
+                rec(31, "Erik", 105L)));
+    }
+
+    @Test
+    public void testParallelDo() throws Exception {
+        LCollection<String> result = lc().parallelDo(ctx -> { if (ctx.element().key
> 26) ctx.emit(ctx.element().name); }, strings());
+        assertCollectionOf(result, "David", "Erik");
+    }
+
+    @Test
+    public void testParallelDoPair() throws Exception {
+        LTable<Integer, String> result = lc().parallelDo(ctx -> {
+            if (ctx.element().key > 26) ctx.emit(Pair.of(ctx.element().key, ctx.element().name));
}, tableOf(ints(), strings()));
+        assertCollectionOf(result, Pair.of(28, "David"), Pair.of(31, "Erik"));
+    }
+
+
+    @Test
+    public void testMap() throws Exception {
+        assertCollectionOf(lc().map(r -> r.key, ints()), 14, 25, 21, 28, 31);
+    }
+
+    @Test
+    public void testMapPair() throws Exception {
+        assertCollectionOf(lc().map(r -> Pair.of(r.key, r.value), tableOf(ints(), longs())),
+                Pair.of(14, 101L),
+                Pair.of(25, 102L),
+                Pair.of(21, 103L),
+                Pair.of(28, 104L),
+                Pair.of(31, 105L));
+    }
+
+    @Test
+    public void testFlatMap() throws Exception {
+        assertCollectionOf(
+                lc().flatMap(s -> Arrays.stream(s.name.split(" ")), strings()),
+                "Alice", "Bo", "B", "Char", "Lotte", "David", "Erik");
+    }
+
+
+    @Test
+    public void testFilterMap() throws Exception {
+        Map<String, String> lookupMap = ImmutableMap.of("Erik", "BOOM", "Alice", "POW");
+        assertCollectionOf(
+                lc().filterMap(r -> lookupMap.containsKey(r.name) ? Optional.of(lookupMap.get(r.name))
: Optional.empty(), strings()),
+                "BOOM", "POW"
+        );
+    }
+
+    @Test
+    public void testFilter() throws Exception {
+        assertCollectionOf(lc().filter(r -> r.key == 21), rec(21, "Char Lotte", 103L));
+    }
+
+
+    @Test
+    public void testIncrement() throws Exception {
+        lc().increment("hello", "world");
+        long value = MemPipeline.getCounters().findCounter("hello", "world").getValue();
+        assertEquals(5L, value);
+    }
+
+    @Test
+    public void testIncrementIf() throws Exception {
+        lc().incrementIf("hello", "conditional_world", r -> r.key < 25);
+        long value = MemPipeline.getCounters().findCounter("hello", "conditional_world").getValue();
+        assertEquals(2L, value);
+    }
+
+    @Test
+    public void testBy() throws Exception {
+        assertCollectionOf(
+                lc().filter(r -> r.key == 21).by(r -> r.key, ints()),
+                Pair.of(21, rec(21, "Char Lotte", 103L)));
+    }
+
+    @Test
+    public void testCount() throws Exception {
+        assertCollectionOf(
+                Lambda.wrap(MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b",
"b")).count(),
+                Pair.of("a", 3L),
+                Pair.of("b", 2L)
+        );
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
new file mode 100644
index 0000000..043387e
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LGroupedTableTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.Pair;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.crunch.lambda.TestCommon.assertCollectionOf;
+import static org.apache.crunch.types.avro.Avros.*;
+
+
+public class LGroupedTableTest {
+
+    LGroupedTable<String, Integer> lgt = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(),
ints()),
+            "a", 2,
+            "a", 3,
+            "b", 5,
+            "c", 7,
+            "c", 11,
+            "c", 13,
+            "c", 13))
+            .groupByKey();
+
+    @Test
+    public void testCombineValues() throws Exception {
+        assertCollectionOf(lgt.combineValues(Aggregators.MAX_INTS()),
+                Pair.of("a", 3),
+                Pair.of("b", 5),
+                Pair.of("c", 13));
+    }
+
+    @Test
+    public void testCombineValues1() throws Exception {
+        assertCollectionOf(lgt.combineValues(() -> Integer.MIN_VALUE, Integer::max, Collections::singleton),
+                Pair.of("a", 3),
+                Pair.of("b", 5),
+                Pair.of("c", 13));
+    }
+
+    @Test
+    public void testMapValues() throws Exception {
+        assertCollectionOf(lgt.mapValues(vs -> vs.map(i -> i.toString()).reduce((a,
b) -> a + "," + b).get(), strings()),
+                Pair.of("a", "2,3"),
+                Pair.of("b", "5"),
+                Pair.of("c", "7,11,13,13"));
+    }
+
+    @Test
+    public void testCollectValues() throws Exception {
+        assertCollectionOf(lgt.collectValues(ArrayList::new, Collection::add, collections(ints())),
+                Pair.of("a", ImmutableList.of(2,3)),
+                Pair.of("b", ImmutableList.of(5)),
+                Pair.of("c", ImmutableList.of(7, 11, 13, 13)));
+    }
+
+    @Test
+    public void testCollectAllValues() throws Exception {
+        assertCollectionOf(lgt.collectAllValues(),
+                Pair.of("a", ImmutableList.of(2,3)),
+                Pair.of("b", ImmutableList.of(5)),
+                Pair.of("c", ImmutableList.of(7, 11, 13, 13)));
+    }
+
+    @Test
+    public void testCollectUniqueValues() throws Exception {
+        assertCollectionOf(lgt.collectUniqueValues(),
+                Pair.of("a", ImmutableSet.of(2, 3)),
+                Pair.of("b", ImmutableSet.of(5)),
+                Pair.of("c", ImmutableSet.of(7, 11, 13)));
+    }
+
+    @Test
+    public void testReduceValues() throws Exception {
+        assertCollectionOf(lgt.reduceValues((a, b) -> a * b),
+                Pair.of("a", 6),
+                Pair.of("b", 5),
+                Pair.of("c", 7 * 11 * 13 * 13)
+                );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
new file mode 100644
index 0000000..f66ada5
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/LTableTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import static org.apache.crunch.lambda.TestCommon.assertCollectionOf;
+import static org.apache.crunch.types.avro.Avros.*;
+
+
+public class LTableTest {
+
+    private LTable<String, Integer> lt1 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(),
ints()),
+            "a", 2,
+            "a", 3,
+            "b", 5,
+            "c", 7,
+            "c", 11,
+            "c", 13,
+            "c", 13));
+
+    private LTable<String, Long> lt2 = Lambda.wrap(MemPipeline.typedTableOf(tableOf(strings(),
longs()),
+            "a", 101L,
+            "b", 102L,
+            "c", 103L
+            ));
+
+    @Test
+    public void testKeys() throws Exception {
+        assertCollectionOf(lt1.keys(), "a", "a", "b", "c", "c", "c", "c");
+    }
+
+    @Test
+    public void testValues() throws Exception {
+        assertCollectionOf(lt2.values(), 101L, 102L, 103L);
+    }
+
+    @Test
+    public void testMapKeys() throws Exception {
+        assertCollectionOf(lt2.mapKeys(String::toUpperCase, strings()),
+                Pair.of("A", 101L),
+                Pair.of("B", 102L),
+                Pair.of("C", 103L)
+                );
+    }
+
+    @Test
+    public void testMapValues() throws Exception {
+        assertCollectionOf(lt2.mapValues(v -> v * 2, longs()),
+                Pair.of("a", 202L),
+                Pair.of("b", 204L),
+                Pair.of("c", 206L)
+        );
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        assertCollectionOf(lt1.join(lt2).values(),
+                Pair.of(2, 101L),
+                Pair.of(3, 101L),
+                Pair.of(5, 102L),
+                Pair.of(7, 103L),
+                Pair.of(11, 103L),
+                Pair.of(13, 103L),
+                Pair.of(13, 103L));
+    }
+
+    @Test
+    public void testCogroup() throws Exception {
+        assertCollectionOf(lt1.cogroup(lt2).values(),
+                Pair.of(ImmutableList.of(2, 3), ImmutableList.of(101L)),
+                Pair.of(ImmutableList.of(5), ImmutableList.of(102L)),
+                Pair.of(ImmutableList.of(7, 11, 13, 13), ImmutableList.of(103L))
+                );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
new file mode 100644
index 0000000..02101ab
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TestCommon.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCommon {
+    @SafeVarargs
+    public static <T> void assertCollectionOf(LCollection<T> actual, T... expected)
{
+        Set<T> actualSet = actual.materialize().collect(Collectors.toSet());
+        Set<T> expectedSet = Sets.newHashSet(expected);
+        assertEquals(expectedSet, actualSet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
new file mode 100644
index 0000000..42540de
--- /dev/null
+++ b/crunch-lambda/src/test/java/org/apache/crunch/lambda/TypedRecord.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lambda;
+
+public class TypedRecord {
+    public int key;
+    public String name;
+    public long value;
+    public static TypedRecord rec(int key, String name, long value) {
+        TypedRecord record = new TypedRecord();
+        record.key = key;
+        record.name = name;
+        record.value = value;
+        return record;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TypedRecord that = (TypedRecord) o;
+
+        if (key != that.key) return false;
+        if (value != that.value) return false;
+        return name != null ? name.equals(that.name) : that.name == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key;
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + (int) (value ^ (value >>> 32));
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/7d7af4ef/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0758e43..b628a77 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,17 @@ under the License.
     <module>crunch-hive</module>
     <module>crunch-dist</module>
   </modules>
+  <profiles>
+    <profile>
+      <id>java-8</id>
+      <activation>
+        <jdk>[1.8,]</jdk>
+      </activation>
+      <modules>
+        <module>crunch-lambda</module>
+      </modules>
+    </profile>
+  </profiles>
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -163,6 +174,12 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.crunch</groupId>
+        <artifactId>crunch-lambda</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-client</artifactId>
         <version>${hadoop.version}</version>


Mime
View raw message