crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-287: Switch internal APIs and integration tests to use ReadableData.
Date Fri, 25 Oct 2013 19:58:18 GMT
Updated Branches:
  refs/heads/master 655df3c45 -> 09624fe3c


CRUNCH-287: Switch internal APIs and integration tests to use ReadableData.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/09624fe3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/09624fe3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/09624fe3

Branch: refs/heads/master
Commit: 09624fe3c79fe8bdd94b1449b394911e9d931e12
Parents: 655df3c
Author: Josh Wills <jwills@apache.org>
Authored: Fri Oct 25 12:56:05 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Fri Oct 25 12:56:05 2013 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/DependentSourcesIT.java   |  7 +-
 .../apache/crunch/LongPipelinePlannerIT.java    |  4 +-
 .../lib/join/BloomFilterJoinStrategy.java       | 80 +++++++-------------
 3 files changed, 33 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/09624fe3/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
index 36bd7a7..ab2fed6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/DependentSourcesIT.java
@@ -23,6 +23,7 @@ import static org.apache.crunch.types.avro.Avros.tableOf;
 import java.util.List;
 
 import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.MRPipelineExecution;
@@ -47,7 +48,7 @@ public class DependentSourcesIT {
         tmpDir.copyResourcePath("shakes.txt"),
         tmpDir.getFileName("out"));
   }
-  
+
   public static void run(MRPipeline p, Path inputPath, String out) throws Exception {
      PCollection<String> in = p.read(From.textFile(inputPath));
      PTable<String, String> op = in.parallelDo("op1", new DoFn<String, Pair<String,
String>>() {
@@ -59,10 +60,10 @@ public class DependentSourcesIT {
       } 
      }, tableOf(strings(), strings()));
      
-     SourceTarget src = (SourceTarget)((MaterializableIterable<Pair<String, String>>)
op.materialize()).getSource();
+     ReadableData<Pair<String, String>> rd = op.asReadable(true);
      
      op = op.parallelDo("op2", IdentityFn.<Pair<String,String>>getInstance(),
tableOf(strings(), strings()),
-         ParallelDoOptions.builder().sourceTargets(src).build());
+         ParallelDoOptions.builder().sourceTargets(rd.getSourceTargets()).build());
      
      PCollection<String> output = op.values();
      output.write(To.textFile(out));

http://git-wip-us.apache.org/repos/asf/crunch/blob/09624fe3/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
index 2cd63f2..5e0b423 100644
--- a/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/LongPipelinePlannerIT.java
@@ -68,8 +68,8 @@ public class LongPipelinePlannerIT {
       } 
     }, strings());
 
-    MaterializableIterable matIt = (MaterializableIterable)iso.materialize();
-    ParallelDoOptions.Builder builder = ParallelDoOptions.builder().sourceTargets((SourceTarget)matIt.getSource());
+    ReadableData<String> isoRD = iso.asReadable(true);
+    ParallelDoOptions.Builder builder = ParallelDoOptions.builder().sourceTargets(isoRD.getSourceTargets());
 
     PTable<Integer, String> splitMap = keyedLower.parallelDo("split-map",
         new MapFn<Pair<Integer, String>, Pair<Integer, String>>() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/09624fe3/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
index 6faef56..a62c39e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java
@@ -34,10 +34,8 @@ import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
-import org.apache.crunch.SourceTarget;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroType;
@@ -46,9 +44,7 @@ import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableType;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.apache.crunch.types.writable.Writables;
-import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.bloom.BloomFilter;
@@ -127,40 +123,30 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K,
U, V> {
   
   @Override
   public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V>
right, JoinType joinType) {
-    
+
     if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN)
{
       throw new IllegalStateException("JoinType " + joinType + " is not supported for BloomFilter
joins");
     }
-    
+
     PTable<K,V> filteredRightSide;
-    if (left.getPipeline() instanceof MRPipeline) {
-      PType<BloomFilter> bloomFilterType = getBloomFilterType(left.getTypeFamily());
-      PCollection<BloomFilter> bloomFilters = left.keys().parallelDo(
-                                                    "Create bloom filters", 
-                                                    new CreateBloomFilterFn(vectorSize, nbHash,
left.getKeyType()), 
-                                                    bloomFilterType);
-      
-      MaterializableIterable<BloomFilter> materializableIterable = (MaterializableIterable<BloomFilter>)
bloomFilters.materialize();
-      FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<K,
V>(
-                                                              materializableIterable.getPath().toString(),

-                                                              vectorSize, nbHash, 
-                                                              left.getKeyType(), bloomFilterType);
-      
-      ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
-      if (materializableIterable.isSourceTarget()) {
-        optionsBuilder.sourceTargets((SourceTarget) materializableIterable.getSource());
-      }
-      
-      filteredRightSide = right.parallelDo("Filter right side with BloomFilters",
-                                                filterKeysFn, right.getPTableType(), optionsBuilder.build());
-  
-      // TODO This shouldn't be necessary due to the ParallelDoOptions, but it seems to be
needed somehow
-      left.getPipeline().run();
-    } else {
-      LOG.warn("Not using Bloom filters outside of MapReduce context");
-      filteredRightSide = right;
-    }
-    
+    PType<BloomFilter> bloomFilterType = getBloomFilterType(left.getTypeFamily());
+    PCollection<BloomFilter> bloomFilters = left.keys().parallelDo(
+        "Create bloom filters",
+        new CreateBloomFilterFn(vectorSize, nbHash, left.getKeyType()),
+        bloomFilterType);
+
+    ReadableData<BloomFilter> bloomData = bloomFilters.asReadable(true);
+    FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<K,
V>(
+        bloomData,
+        vectorSize, nbHash,
+        left.getKeyType());
+
+    ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder();
+    optionsBuilder.sourceTargets(bloomData.getSourceTargets());
+
+    filteredRightSide = right.parallelDo("Filter right side with BloomFilters",
+        filterKeysFn, right.getPTableType(), optionsBuilder.build());
+
     return delegateJoinStrategy.join(left, filteredRightSide, joinType);
   }
   
@@ -206,50 +192,38 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K,
U, V> {
    */
   private static class FilterKeysWithBloomFilterFn<K,V> extends FilterFn<Pair<K,
V>> {
     
-    private String inputPath;
     private int vectorSize;
     private int nbHash;
     private PType<K> keyType;
     private PType<BloomFilter> bloomFilterPType;
     private BloomFilter bloomFilter;
     private transient MapFn<K,byte[]> keyToBytesFn;
-    
-    FilterKeysWithBloomFilterFn(String inputPath, int vectorSize, int nbHash, PType<K>
keyType, PType<BloomFilter> bloomFilterPtype) {
-      this.inputPath = inputPath;
+    private ReadableData<BloomFilter> bloomData;
+
+    FilterKeysWithBloomFilterFn(ReadableData<BloomFilter> bloomData, int vectorSize,
int nbHash, PType<K> keyType) {
+      this.bloomData = bloomData;
       this.vectorSize = vectorSize;
       this.nbHash = nbHash;
       this.keyType = keyType;
-      this.bloomFilterPType = bloomFilterPtype;
     }
     
     
-    private Path getCacheFilePath() {
-      Path local = DistCache.getPathToCacheFile(new Path(inputPath), getConfiguration());
-      if (local == null) {
-        throw new CrunchRuntimeException("Can't find local cache file for '" + inputPath
+ "'");
-      }
-      return local;
-    }
-
     @Override
     public void configure(Configuration conf) {
-      DistCache.addCacheFile(new Path(inputPath), conf);
+      bloomData.configure(conf);
     }
     
     @Override
     public void initialize() {
       super.initialize();
       
-      bloomFilterPType.initialize(getConfiguration());
       keyType.initialize(getConfiguration());
       
       keyToBytesFn = getKeyToBytesMapFn(keyType, getConfiguration());
 
-      ReadableSourceTarget<BloomFilter> sourceTarget = bloomFilterPType.getDefaultFileSource(
-          getCacheFilePath());
       Iterable<BloomFilter> iterable;
       try {
-        iterable = sourceTarget.read(getConfiguration());
+        iterable = bloomData.read(getContext());
       } catch (IOException e) {
         throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
       }


Mime
View raw message