crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject git commit: CRUNCH-183 Handle object reuse in reservoir sampling
Date Sun, 24 Mar 2013 19:19:46 GMT
Updated Branches:
  refs/heads/master 0b8677579 -> 5d30af0c0


CRUNCH-183 Handle object reuse in reservoir sampling


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d30af0c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d30af0c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d30af0c

Branch: refs/heads/master
Commit: 5d30af0c03a565894aa1cd7226ab13f90437548a
Parents: 0b86775
Author: Gabriel Reid <greid@apache.org>
Authored: Sun Mar 24 18:07:56 2013 +0100
Committer: Gabriel Reid <greid@apache.org>
Committed: Sun Mar 24 20:17:01 2013 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Sample.java    |    4 +-
 .../java/org/apache/crunch/lib/SampleUtils.java    |   17 +++++++++-----
 2 files changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5d30af0c/crunch/src/main/java/org/apache/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Sample.java b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
index be75ae2..5a66101 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Sample.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Sample.java
@@ -203,9 +203,9 @@ public class Sample {
     PTableType<Integer, Pair<Double, T>> ptt = ptf.tableOf(ptf.ints(),
         ptf.pairs(ptf.doubles(), ttype));
     
-    return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed), ptt)
+    return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed, ttype),
ptt)
         .groupByKey(1)
-        .combineValues(new WRSCombineFn<T>(sampleSizes))
+        .combineValues(new WRSCombineFn<T>(sampleSizes, ttype))
         .parallelDo(new MapFn<Pair<Integer, Pair<Double, T>>, Pair<Integer,
T>>() {
           @Override
           public Pair<Integer, T> map(Pair<Integer, Pair<Double, T>> p)
{

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d30af0c/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java b/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java
index cbc30e4..d1cad35 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/SampleUtils.java
@@ -27,6 +27,7 @@ import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -65,12 +66,14 @@ class SampleUtils {
   
     private int[] sampleSizes;
     private Long seed;
+    private PType<T> valueType;
     private transient List<SortedMap<Double, T>> reservoirs;
     private transient Random random;
     
-    public ReservoirSampleFn(int[] sampleSizes, Long seed) {
+    public ReservoirSampleFn(int[] sampleSizes, Long seed, PType<T> valueType) {
       this.sampleSizes = sampleSizes;
       this.seed = seed;
+      this.valueType = valueType;
     }
     
     @Override
@@ -98,10 +101,10 @@ class SampleUtils {
         double score = Math.log(random.nextDouble()) / weight;
         SortedMap<Double, T> reservoir = reservoirs.get(id);
         if (reservoir.size() < sampleSizes[id]) { 
-          reservoir.put(score, p.first());        
+          reservoir.put(score, valueType.getDetachedValue(p.first()));        
         } else if (score > reservoir.firstKey()) {
           reservoir.remove(reservoir.firstKey());
-          reservoir.put(score, p.first());
+          reservoir.put(score, valueType.getDetachedValue(p.first()));
         }
       }
     }
@@ -120,10 +123,12 @@ class SampleUtils {
   static class WRSCombineFn<T> extends CombineFn<Integer, Pair<Double, T>>
{
 
     private int[] sampleSizes;
+    private PType<T> valueType;
     private List<SortedMap<Double, T>> reservoirs;
     
-    public WRSCombineFn(int[] sampleSizes) {
+    public WRSCombineFn(int[] sampleSizes, PType<T> valueType) {
       this.sampleSizes = sampleSizes;
+      this.valueType = valueType;
     }
 
     @Override
@@ -140,10 +145,10 @@ class SampleUtils {
       SortedMap<Double, T> reservoir = reservoirs.get(input.first());
       for (Pair<Double, T> p : input.second()) {
         if (reservoir.size() < sampleSizes[input.first()]) { 
-          reservoir.put(p.first(), p.second());        
+          reservoir.put(p.first(), valueType.getDetachedValue(p.second()));        
         } else if (p.first() > reservoir.firstKey()) {
           reservoir.remove(reservoir.firstKey());
-          reservoir.put(p.first(), p.second());  
+          reservoir.put(p.first(), valueType.getDetachedValue(p.second()));  
         }
       }
     }


Mime
View raw message