crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-381 Use descriptions in parallelDo calls
Date Fri, 25 Apr 2014 20:21:05 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 a96e30892 -> b6f203e7a


CRUNCH-381 Use descriptions in parallelDo calls

Add simple descriptions to PCollection#parallelDo calls in library
methods so that it's easier to trace through job plans.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b6f203e7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b6f203e7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b6f203e7

Branch: refs/heads/apache-crunch-0.8
Commit: b6f203e7a1313002722a0ae12a16882386af6e82
Parents: a96e308
Author: Gabriel Reid <greid@apache.org>
Authored: Fri Apr 25 17:09:13 2014 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Fri Apr 25 22:15:28 2014 +0200

----------------------------------------------------------------------
 .../org/apache/crunch/impl/dist/collect/PCollectionImpl.java | 2 +-
 .../src/main/java/org/apache/crunch/lib/Cartesian.java       | 2 +-
 .../src/main/java/org/apache/crunch/lib/Channels.java        | 4 ++--
 crunch-core/src/main/java/org/apache/crunch/lib/Sample.java  | 8 ++++----
 crunch-core/src/main/java/org/apache/crunch/lib/Set.java     | 3 +++
 5 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
index 7f6984a..a1e70fe 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java
@@ -228,7 +228,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S>
{
 
   @Override
   public PCollection<S> filter(FilterFn<S> filterFn) {
-    return parallelDo(filterFn, getPType());
+    return parallelDo("Filter with " + filterFn.getClass().getSimpleName(), filterFn, getPType());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
index 08327dd..6d47348 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java
@@ -205,7 +205,7 @@ public class Cartesian {
 
     PTypeFamily ctf = cg.getTypeFamily();
 
-    return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>,
Pair<U, V>>() {
+    return cg.parallelDo("Extract second element", new MapFn<Pair<Pair<Integer,
Integer>, Pair<U, V>>, Pair<U, V>>() {
       @Override
       public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>>
input) {
         return input.second();

http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
index 568ca20..2e0fe1d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java
@@ -59,8 +59,8 @@ public class Channels {
   public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T,
U>> pCollection,
       PType<T> firstPType, PType<U> secondPType) {
 
-    PCollection<T> first = pCollection.parallelDo(new FirstEmittingDoFn<T, U>(),
firstPType);
-    PCollection<U> second = pCollection.parallelDo(new SecondEmittingDoFn<T, U>(),
secondPType);
+    PCollection<T> first = pCollection.parallelDo("Extract first value", new FirstEmittingDoFn<T,
U>(), firstPType);
+    PCollection<U> second = pCollection.parallelDo("Extract second value", new SecondEmittingDoFn<T,
U>(), secondPType);
     return Pair.of(first, second);
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
index 5266545..ec0006a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java
@@ -119,7 +119,7 @@ public class Sample {
     PTypeFamily ptf = input.getTypeFamily();
     PType<Pair<T, Integer>> ptype = ptf.pairs(input.getPType(), ptf.ints());
     return weightedReservoirSample(
-        input.parallelDo(new MapFn<T, Pair<T, Integer>>() {
+        input.parallelDo("Map to pairs for reservoir sampling", new MapFn<T, Pair<T,
Integer>>() {
           @Override
           public Pair<T, Integer> map(T t) { return Pair.of(t, 1); }
         }, ptype),
@@ -163,7 +163,7 @@ public class Sample {
         }, ptf.tableOf(ptf.ints(), input.getPType()));
     int[] ss = { sampleSize };
     return groupedWeightedReservoirSample(groupedIn, ss, seed)
-        .parallelDo(new MapFn<Pair<Integer, T>, T>() {
+        .parallelDo("Extract sampled value from pair", new MapFn<Pair<Integer, T>,
T>() {
           @Override
           public T map(Pair<Integer, T> p) {
             return p.second();
@@ -204,10 +204,10 @@ public class Sample {
     PTableType<Integer, Pair<Double, T>> ptt = ptf.tableOf(ptf.ints(),
         ptf.pairs(ptf.doubles(), ttype));
     
-    return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed, ttype),
ptt)
+    return input.parallelDo("Initial reservoir sampling", new ReservoirSampleFn<T, N>(sampleSizes,
seed, ttype), ptt)
         .groupByKey(1)
         .combineValues(new WRSCombineFn<T>(sampleSizes, ttype))
-        .parallelDo(new MapFn<Pair<Integer, Pair<Double, T>>, Pair<Integer,
T>>() {
+        .parallelDo("Extract sampled values", new MapFn<Pair<Integer, Pair<Double,
T>>, Pair<Integer, T>>() {
           @Override
           public Pair<Integer, T> map(Pair<Integer, Pair<Double, T>> p)
{
             return Pair.of(p.first(), p.second().second());

http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Set.java b/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
index 0ba879c..bb16659 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Set.java
@@ -42,6 +42,7 @@ public class Set {
    */
   public static <T> PCollection<T> difference(PCollection<T> coll1, PCollection<T>
coll2) {
     return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo(
+        "Calculate differences of sets",
         new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
T>() {
           @Override
           public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>
input, Emitter<T> emitter) {
@@ -61,6 +62,7 @@ public class Set {
    */
   public static <T> PCollection<T> intersection(PCollection<T> coll1, PCollection<T>
coll2) {
     return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo(
+        "Calculate intersection of sets",
         new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
T>() {
           @Override
           public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>
input, Emitter<T> emitter) {
@@ -91,6 +93,7 @@ public class Set {
     PTypeFamily typeFamily = coll1.getTypeFamily();
     PType<T> type = coll1.getPType();
     return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo(
+        "Calculate common values of sets",
         new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
Tuple3<T, T, T>>() {
           @Override
           public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>
input,


Mime
View raw message