crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject crunch git commit: CRUNCH-599: Fix increment and incrementIf methods in crunch-lambda so they also emit the incoming element
Date Fri, 01 Apr 2016 14:16:02 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 1252e7f91 -> 65f39198e


CRUNCH-599: Fix increment and incrementIf methods in crunch-lambda so they also emit the incoming
element


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/65f39198
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/65f39198
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/65f39198

Branch: refs/heads/master
Commit: 65f39198ebc9ba5f1557afd4e350227919c80229
Parents: 1252e7f
Author: David Whiting <davw@apache.org>
Authored: Thu Mar 31 12:06:45 2016 +0200
Committer: David Whiting <davw@apache.org>
Committed: Thu Mar 31 12:06:45 2016 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lambda/LCollection.java | 12 ++++++++++--
 .../src/main/java/org/apache/crunch/lambda/LTable.java  | 12 ++++++++++--
 2 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/65f39198/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
index 6a8dd62..a7ca310 100644
--- a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java
@@ -141,14 +141,20 @@ public interface LCollection<S> {
      * Increment a counter for every element in the collection
      */
     default LCollection<S> increment(Enum<?> counter) {
-        return parallelDo(ctx -> ctx.increment(counter), pType());
+        return parallelDo(ctx -> {
+            ctx.increment(counter);
+            ctx.emit(ctx.element());
+        }, pType());
     }
 
     /**
      * Increment a counter for every element in the collection
      */
     default LCollection<S> increment(String counterGroup, String counterName) {
-        return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+        return parallelDo(ctx -> {
+            ctx.increment(counterGroup, counterName);
+            ctx.emit(ctx.element());
+        }, pType());
     }
 
     /**
@@ -157,6 +163,7 @@ public interface LCollection<S> {
     default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition)
{
         return parallelDo(ctx -> {
             if (condition.test(ctx.element())) ctx.increment(counter);
+            ctx.emit(ctx.element());
         }, pType());
     }
 
@@ -166,6 +173,7 @@ public interface LCollection<S> {
     default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S>
condition) {
         return parallelDo(ctx -> {
             if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+            ctx.emit(ctx.element());
         }, pType());
     }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/65f39198/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
----------------------------------------------------------------------
diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
index 9f6616e..8360a33 100644
--- a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
+++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java
@@ -185,18 +185,25 @@ public interface LTable<K, V> extends LCollection<Pair<K,
V>> {
 
     /** {@inheritDoc} */
     default LTable<K, V> increment(Enum<?> counter) {
-        return parallelDo(ctx -> ctx.increment(counter), pType());
+        return parallelDo(ctx -> {
+            ctx.increment(counter);
+            ctx.emit(ctx.element());
+        }, pType());
     }
 
     /** {@inheritDoc} */
     default LTable<K, V> increment(String counterGroup, String counterName) {
-        return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType());
+        return parallelDo(ctx -> {
+            ctx.increment(counterGroup, counterName);
+            ctx.emit(ctx.element());
+        }, pType());
     }
 
     /** {@inheritDoc} */
     default LTable<K, V> incrementIf(Enum<?> counter, SPredicate<Pair<K,
V>> condition) {
         return parallelDo(ctx -> {
             if (condition.test(ctx.element())) ctx.increment(counter);
+            ctx.emit(ctx.element());
         }, pType());
     }
 
@@ -204,6 +211,7 @@ public interface LTable<K, V> extends LCollection<Pair<K,
V>> {
     default LTable<K, V> incrementIf(String counterGroup, String counterName, SPredicate<Pair<K,
V>> condition) {
         return parallelDo(ctx -> {
             if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName);
+            ctx.emit(ctx.element());
         }, pType());
     }
 }


Mime
View raw message