incubator-crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joseph Adler <joseph.ad...@gmail.com>
Subject Re: map side ("replicated") joins in Crunch
Date Tue, 03 Jul 2012 16:01:39 GMT
Awesome! Will give this a try soon... wish I could have helped with this one...

On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid <gabriel.reid@gmail.com> wrote:
> Thanks for pointing that out Chris. I'm guessing the mailing list is
> stripping out attachments(?). Once the JIRA is up and running then
> that will be taken care of I guess.
>
> The mime type on the last attempt was application/octet-stream, so
> I've renamed this to a .txt file to try to ensure that it'll get a
> text/plain mime type (although I don't know if that'll make a
> difference). I've also pasted it inline below, hopefully one of those
> solutions works.
>
> - Gabriel
>
>
> diff --git src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> index 420e8dc..c8ba596 100644
> --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> @@ -59,9 +59,9 @@ import com.google.common.collect.Sets;
>  public class MRPipeline implements Pipeline {
>
>    private static final Log LOG = LogFactory.getLog(MRPipeline.class);
> -
> +
>    private static final Random RANDOM = new Random();
> -
> +
>    private final Class<?> jarClass;
>    private final String name;
>    private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
> @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline {
>    public MRPipeline(Class<?> jarClass) throws IOException {
>      this(jarClass, new Configuration());
>    }
> -
> -  public MRPipeline(Class<?> jarClass, String name){
> +
> +  public MRPipeline(Class<?> jarClass, String name) {
>      this(jarClass, name, new Configuration());
>    }
> -
> +
>    public MRPipeline(Class<?> jarClass, Configuration conf) {
> -         this(jarClass, jarClass.getName(), conf);
> +    this(jarClass, jarClass.getName(), conf);
>    }
> -
> +
>    public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
>      this.jarClass = jarClass;
>      this.name = name;
> @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline {
>
>    @Override
>    public void setConfiguration(Configuration conf) {
> -       this.conf = conf;
> +    this.conf = conf;
>    }
> -
> +
>    @Override
>    public PipelineResult run() {
>      MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
> @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline {
>          boolean materialized = false;
>          for (Target t : outputTargets.get(c)) {
>            if (!materialized && t instanceof Source) {
> -           c.materializeAt((SourceTarget) t);
> -           materialized = true;
> +            c.materializeAt((SourceTarget) t);
> +            materialized = true;
>            }
>          }
>        }
> @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline {
>      cleanup();
>      return res;
>    }
> -
> +
>    public <S> PCollection<S> read(Source<S> source) {
>      return new InputCollection<S>(source, this);
>    }
> @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline {
>    @SuppressWarnings("unchecked")
>    public void write(PCollection<?> pcollection, Target target) {
>      if (pcollection instanceof PGroupedTableImpl) {
> -      pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
> +      pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
>      } else if (pcollection instanceof UnionCollection || pcollection
> instanceof UnionTable) {
> -      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
> -                 (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
> +      pcollection = pcollection.parallelDo("UnionCollectionWrapper",
> +          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
>      }
>      addOutput((PCollectionImpl<?>) pcollection, target);
>    }
>
>    private void addOutput(PCollectionImpl<?> impl, Target target) {
>      if (!outputTargets.containsKey(impl)) {
> -      outputTargets.put(impl, Sets.<Target>newHashSet());
> +      outputTargets.put(impl, Sets.<Target> newHashSet());
>      }
>      outputTargets.get(impl).add(target);
>    }
> -
> +
>    @Override
>    public <T> Iterable<T> materialize(PCollection<T> pcollection) {
> -
> -    if (pcollection instanceof UnionCollection) {
> -       pcollection = pcollection.parallelDo("UnionCollectionWrapper",
> -               (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
> -       }
> -    PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
> +
> +    PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
> +    ReadableSourceTarget<T> srcTarget =
> getMaterializeSourceTarget(pcollectionImpl);
> +
> +    MaterializableIterable<T> c = new MaterializableIterable<T>(this,
> srcTarget);
> +    if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
> +      outputTargetsToMaterialize.put(pcollectionImpl, c);
> +    }
> +    return c;
> +  }
> +
> +  /**
> +   * Retrieve a ReadableSourceTarget that provides access to the contents of a
> +   * {@link PCollection}. This is primarily intended as a helper method to
> +   * {@link #materialize(PCollection)}. The underlying data of the
> +   * ReadableSourceTarget may not be actually present until the
> pipeline is run.
> +   *
> +   * @param pcollection
> +   *          The collection for which the ReadableSourceTarget is to be
> +   *          retrieved
> +   * @return The ReadableSourceTarget
> +   * @throws IllegalArgumentException
> +   *           If no ReadableSourceTarget can be retrieved for the given
> +   *           PCollection
> +   */
> +  public <T> ReadableSourceTarget<T>
> getMaterializeSourceTarget(PCollection<T> pcollection) {
> +    PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
>      SourceTarget<T> matTarget = impl.getMaterializedAt();
>      if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
> -      return new MaterializableIterable<T>(this,
> (ReadableSourceTarget<T>) matTarget);
> +      return (ReadableSourceTarget<T>) matTarget;
> +    }
> +
> +    ReadableSourceTarget<T> srcTarget = null;
> +    if (outputTargets.containsKey(pcollection)) {
> +      for (Target target : outputTargets.get(impl)) {
> +        if (target instanceof ReadableSourceTarget) {
> +          srcTarget = (ReadableSourceTarget<T>) target;
> +          break;
> +        }
> +      }
>      }
> -
> -       ReadableSourceTarget<T> srcTarget = null;
> -       if (outputTargets.containsKey(pcollection)) {
> -         for (Target target : outputTargets.get(impl)) {
> -           if (target instanceof ReadableSourceTarget) {
> -                 srcTarget = (ReadableSourceTarget) target;
> -                 break;
> -           }
> -         }
> -       }
> -
> -       if (srcTarget == null) {
> -         SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
> -         if (!(st instanceof ReadableSourceTarget)) {
> -               throw new IllegalArgumentException("The PType for the given
> PCollection is not readable"
> -                   + " and cannot be materialized");
> -         } else {
> -               srcTarget = (ReadableSourceTarget) st;
> -               addOutput(impl, srcTarget);
> -         }
> -       }
> -
> -       MaterializableIterable<T> c = new MaterializableIterable<T>(this,
srcTarget);
> -       outputTargetsToMaterialize.put(impl, c);
> -       return c;
> +
> +    if (srcTarget == null) {
> +      SourceTarget<T> st = createIntermediateOutput(pcollection.getPType());
> +      if (!(st instanceof ReadableSourceTarget)) {
> +        throw new IllegalArgumentException("The PType for the given
> PCollection is not readable"
> +            + " and cannot be materialized");
> +      } else {
> +        srcTarget = (ReadableSourceTarget<T>) st;
> +        addOutput(impl, srcTarget);
> +      }
> +    }
> +
> +    return srcTarget;
> +  }
> +
> +  /**
> +   * Safely cast a PCollection into a PCollectionImpl, including
> handling the case of UnionCollections.
> +   * @param pcollection The PCollection to be cast/transformed
> +   * @return The PCollectionImpl representation
> +   */
> +  private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T>
> pcollection) {
> +    PCollectionImpl<T> pcollectionImpl = null;
> +    if (pcollection instanceof UnionCollection) {
> +      pcollectionImpl = (PCollectionImpl<T>)
> pcollection.parallelDo("UnionCollectionWrapper",
> +          (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
> +    } else {
> +      pcollectionImpl = (PCollectionImpl<T>) pcollection;
> +    }
> +    return pcollectionImpl;
>    }
>
>    public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype)
{
> -       return ptype.getDefaultFileSource(createTempPath());
> +    return ptype.getDefaultFileSource(createTempPath());
>    }
>
>    public Path createTempPath() {
>      tempFileIndex++;
>      return new Path(tempDirectory, "p" + tempFileIndex);
>    }
> -
> +
>    private static Path createTempDirectory(Configuration conf) {
>      Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
> -       try {
> -         FileSystem.get(conf).mkdirs(dir);
> -       } catch (IOException e) {
> -         LOG.error("Exception creating job output directory", e);
> -         throw new RuntimeException(e);
> -       }
> +    try {
> +      FileSystem.get(conf).mkdirs(dir);
> +    } catch (IOException e) {
> +      LOG.error("Exception creating job output directory", e);
> +      throw new RuntimeException(e);
> +    }
>      return dir;
>    }
> -
> +
>    @Override
>    public <T> void writeTextFile(PCollection<T> pcollection, String pathName)
{
>      // Ensure that this is a writable pcollection instance.
> -    pcollection = pcollection.parallelDo("asText", IdentityFn.<T>getInstance(),
> -        WritableTypeFamily.getInstance().as(pcollection.getPType()));
> +    pcollection = pcollection.parallelDo("asText", IdentityFn.<T>
> getInstance(), WritableTypeFamily
> +        .getInstance().as(pcollection.getPType()));
>      write(pcollection, At.textFile(pathName));
>    }
>
> @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline {
>        LOG.info("Exception during cleanup", e);
>      }
>    }
> -
> +
>    public int getNextAnonymousStageId() {
>      return nextAnonymousStageId++;
>    }
> @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline {
>    public void enableDebug() {
>      // Turn on Crunch runtime error catching.
>      getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
> -
> +
>      // Write Hadoop's WARN logs to the console.
>      Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch");
>      Appender console = crunchInfoLogger.getAppender("A");
> @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline {
>        LOG.warn("Could not find console appender named 'A' for writing
> Hadoop warning logs");
>      }
>    }
> -
> +
>    @Override
>    public String getName() {
> -         return name;
> +    return name;
>    }
>  }
> diff --git src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> index d41a52e..68ef054 100644
> --- src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> +++ src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends
> RuntimeException {
>      super(e);
>    }
>
> +  public CrunchRuntimeException(String msg, Exception e) {
> +    super(msg, e);
> +  }
> +
>    public boolean wasLogged() {
>      return logged;
>    }
> diff --git src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> index 1122d62..4debfeb 100644
> --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends
> FileSourceImpl<T> implements ReadableSour
>
>    @Override
>    public Iterable<T> read(Configuration conf) throws IOException {
> -    return CompositePathIterable.create(FileSystem.get(conf), path,
> new AvroFileReaderFactory<T>(
> +    FileSystem fs = FileSystem.get(path.toUri(), conf);
> +    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>(
>          (AvroType<T>) ptype, conf));
>    }
>  }
> diff --git src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> index 24dec2d..462ef93 100644
> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource;
>  import com.cloudera.crunch.io.impl.FileSourceImpl;
>  import com.cloudera.crunch.types.PType;
>
> -public class SeqFileSource<T> extends FileSourceImpl<T> implements
> -       ReadableSource<T> {
> +public class SeqFileSource<T> extends FileSourceImpl<T> implements
> ReadableSource<T> {
>
>    public SeqFileSource(Path path, PType<T> ptype) {
> -       super(path, ptype, SequenceFileInputFormat.class);
> +    super(path, ptype, SequenceFileInputFormat.class);
>    }
> -
> +
>    @Override
>    public Iterable<T> read(Configuration conf) throws IOException {
> -       FileSystem fs = FileSystem.get(conf);
> -       return CompositePathIterable.create(fs, path,
> -           new SeqFileReaderFactory<T>(ptype, conf));
> +    FileSystem fs = FileSystem.get(path.toUri(), conf);
> +    return CompositePathIterable.create(fs, path, new
> SeqFileReaderFactory<T>(ptype, conf));
>    }
>
>    @Override
> diff --git src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> index 69ca12b..4db6658 100644
> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends
> FileTableSourceImpl<K, V> implemen
>
>    @Override
>    public Iterable<Pair<K, V>> read(Configuration conf) throws IOException
{
> -    FileSystem fs = FileSystem.get(conf);
> +    FileSystem fs = FileSystem.get(path.toUri(), conf);
>      return CompositePathIterable.create(fs, path,
>          new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
>    }
> diff --git src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> index a876843..e0dbe68 100644
> --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> @@ -67,7 +67,7 @@ public class TextFileSource<T> extends
> FileSourceImpl<T> implements
>
>    @Override
>    public Iterable<T> read(Configuration conf) throws IOException {
> -       return CompositePathIterable.create(FileSystem.get(conf), path,
> -           new TextFileReaderFactory<T>(ptype, conf));
> +    return CompositePathIterable.create(FileSystem.get(path.toUri(),
> conf), path,
> +        new TextFileReaderFactory<T>(ptype, conf));
>    }
>  }
> diff --git src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
> src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
> new file mode 100644
> index 0000000..8072e07
> --- /dev/null
> +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
> @@ -0,0 +1,143 @@
> +package com.cloudera.crunch.lib.join;
> +
> +import java.io.IOException;
> +
> +import org.apache.hadoop.filecache.DistributedCache;
> +import org.apache.hadoop.fs.FileSystem;
> +import org.apache.hadoop.fs.Path;
> +
> +import com.cloudera.crunch.DoFn;
> +import com.cloudera.crunch.Emitter;
> +import com.cloudera.crunch.PTable;
> +import com.cloudera.crunch.Pair;
> +import com.cloudera.crunch.impl.mr.MRPipeline;
> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
> +import com.cloudera.crunch.io.ReadableSourceTarget;
> +import com.cloudera.crunch.io.impl.SourcePathTargetImpl;
> +import com.cloudera.crunch.types.PType;
> +import com.cloudera.crunch.types.PTypeFamily;
> +import com.google.common.collect.ArrayListMultimap;
> +import com.google.common.collect.Multimap;
> +
> +/**
> + * Utility for doing map side joins on a common key between two
> {@link PTable}s.
> + * <p>
> + * A map side join is an optimized join which doesn't use a reducer; instead,
> + * the right side of the join is loaded into memory and the join is
> performed in
> + * a mapper. This style of join has the important implication that
> the output of
> + * the join is not sorted, which is the case with a conventional
> (reducer-based)
> + * join.
> + * <p>
> + * <b>Note:</b>This utility is only supported when running with a
> + * {@link MRPipeline} as the pipeline.
> + */
> +public class MapsideJoin {
> +
> +  /**
> +   * Join two tables using a map side join. The right-side table will be loaded
> +   * fully in memory, so this method should only be used if the right side
> +   * table's contents can fit in the memory allocated to mappers. The join
> +   * performed by this method is an inner join.
> +   *
> +   * @param left
> +   *          The left-side table of the join
> +   * @param right
> +   *          The right-side table of the join, whose contents will be fully
> +   *          read into memory
> +   * @return A table keyed on the join key, containing pairs of joined values
> +   */
> +  public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K,
U>
> left, PTable<K, V> right) {
> +
> +    if (!(right.getPipeline() instanceof MRPipeline)) {
> +      throw new CrunchRuntimeException("Map-side join is only
> supported within a MapReduce context");
> +    }
> +
> +    MRPipeline pipeline = (MRPipeline) right.getPipeline();
> +    pipeline.materialize(right);
> +
> +    // TODO Move necessary logic to MRPipeline so that we can theoretically
> +    // optimize his by running the setup of multiple map-side joins
> concurrently
> +    pipeline.run();
> +
> +    ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
> +        .getMaterializeSourceTarget(right);
> +    if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
> +      throw new CrunchRuntimeException("Right-side contents can't be
> read from a path");
> +    }
> +
> +    // Suppress warnings because we've just checked this cast via instanceof
> +    @SuppressWarnings("unchecked")
> +    SourcePathTargetImpl<Pair<K, V>> sourcePathTarget =
> (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
> +
> +    Path path = sourcePathTarget.getPath();
> +    DistributedCache.addCacheFile(path.toUri(), pipeline.getConfiguration());
> +
> +    MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U,
> V>(path.toString(),
> +        right.getPType());
> +    PTypeFamily typeFamily = left.getTypeFamily();
> +    return left.parallelDo(
> +        "mapjoin",
> +        mapJoinDoFn,
> +        typeFamily.tableOf(left.getKeyType(),
> +            typeFamily.pairs(left.getValueType(), right.getValueType())));
> +
> +  }
> +
> +  static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>,
> Pair<K, Pair<U, V>>> {
> +
> +    private String inputPath;
> +    private PType<Pair<K, V>> ptype;
> +    private Multimap<K, V> joinMap;
> +
> +    public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
> +      this.inputPath = inputPath;
> +      this.ptype = ptype;
> +    }
> +
> +    private Path getCacheFilePath() {
> +      try {
> +        for (Path localPath :
> DistributedCache.getLocalCacheFiles(getConfiguration())) {
> +          if (localPath.toString().endsWith(inputPath)) {
> +            return
> localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
> +
> +          }
> +        }
> +      } catch (IOException e) {
> +        throw new CrunchRuntimeException(e);
> +      }
> +
> +      throw new CrunchRuntimeException("Can't find local cache file
> for '" + inputPath + "'");
> +    }
> +
> +    @Override
> +    public void initialize() {
> +      super.initialize();
> +
> +      ReadableSourceTarget<Pair<K, V>> sourceTarget =
> (ReadableSourceTarget<Pair<K, V>>) ptype
> +          .getDefaultFileSource(getCacheFilePath());
> +      Iterable<Pair<K, V>> iterable = null;
> +      try {
> +        iterable = sourceTarget.read(getConfiguration());
> +      } catch (IOException e) {
> +        throw new CrunchRuntimeException("Error reading right-side of
> map side join: ", e);
> +      }
> +
> +      joinMap = ArrayListMultimap.create();
> +      for (Pair<K, V> joinPair : iterable) {
> +        joinMap.put(joinPair.first(), joinPair.second());
> +      }
> +    }
> +
> +    @Override
> +    public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U,
> V>>> emitter) {
> +      K key = input.first();
> +      U value = input.second();
> +      for (V joinValue : joinMap.get(key)) {
> +        Pair<U, V> valuePair = Pair.of(value, joinValue);
> +        emitter.emit(Pair.of(key, valuePair));
> +      }
> +    }
> +
> +  }
> +
> +}
> diff --git src/main/java/com/cloudera/crunch/types/PType.java
> src/main/java/com/cloudera/crunch/types/PType.java
> index af4ef1b..ae480aa 100644
> --- src/main/java/com/cloudera/crunch/types/PType.java
> +++ src/main/java/com/cloudera/crunch/types/PType.java
> @@ -15,6 +15,7 @@
>
>  package com.cloudera.crunch.types;
>
> +import java.io.Serializable;
>  import java.util.List;
>
>  import org.apache.hadoop.fs.Path;
> @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget;
>   * {@code PCollection}.
>   *
>   */
> -public interface PType<T> {
> +public interface PType<T> extends Serializable {
>    /**
>     * Returns the Java type represented by this {@code PType}.
>     */
> diff --git src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> index 3db00c0..29af9fb 100644
> --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> @@ -14,6 +14,7 @@
>   */
>  package com.cloudera.crunch.types.avro;
>
> +import java.io.Serializable;
>  import java.util.List;
>
>  import org.apache.avro.Schema;
> @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> {
>         private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
>
>         private final Class<T> typeClass;
> -       private final Schema schema;
> +       private final String schemaString;
> +       private transient Schema schema;
>         private final MapFn baseInputMapFn;
>         private final MapFn baseOutputMapFn;
>         private final List<PType> subTypes;
> @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> {
>                         MapFn outputMapFn, PType... ptypes) {
>                 this.typeClass = typeClass;
>                 this.schema = Preconditions.checkNotNull(schema);
> +               this.schemaString = schema.toString();
>                 this.baseInputMapFn = inputMapFn;
>                 this.baseOutputMapFn = outputMapFn;
>                 this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
> @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> {
>         }
>
>         public Schema getSchema() {
> +         if (schema == null){
> +           schema = new Schema.Parser().parse(schemaString);
> +         }
>                 return schema;
>         }
>
> diff --git src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
> new file mode 100644
> index 0000000..f265460
> --- /dev/null
> +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
> @@ -0,0 +1,60 @@
> +package com.cloudera.crunch.impl.mr;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.mockito.Mockito.doReturn;
> +import static org.mockito.Mockito.mock;
> +import static org.mockito.Mockito.spy;
> +import static org.mockito.Mockito.when;
> +
> +import java.io.IOException;
> +
> +import org.junit.Before;
> +import org.junit.Test;
> +
> +import com.cloudera.crunch.SourceTarget;
> +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
> +import com.cloudera.crunch.io.ReadableSourceTarget;
> +import com.cloudera.crunch.types.avro.Avros;
> +
> +public class MRPipelineTest {
> +
> +  private MRPipeline pipeline;
> +
> +  @Before
> +  public void setUp() throws IOException {
> +    pipeline = spy(new MRPipeline(MRPipelineTest.class));
> +  }
> +
> +  @Test
> +  public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
> +    PCollectionImpl<String> materializedPcollection =
> mock(PCollectionImpl.class);
> +    ReadableSourceTarget<String> readableSourceTarget =
> mock(ReadableSourceTarget.class);
> +    when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
> +
> +    assertEquals(readableSourceTarget,
> pipeline.getMaterializeSourceTarget(materializedPcollection));
> +  }
> +
> +  @Test
> +  public void testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
> +
> +    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
> +    ReadableSourceTarget<String> readableSourceTarget =
> mock(ReadableSourceTarget.class);
> +    when(pcollection.getPType()).thenReturn(Avros.strings());
> +    doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
> +    when(pcollection.getMaterializedAt()).thenReturn(null);
> +
> +    assertEquals(readableSourceTarget,
> pipeline.getMaterializeSourceTarget(pcollection));
> +  }
> +
> +  @Test(expected = IllegalArgumentException.class)
> +  public void testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget()
> {
> +    PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
> +    SourceTarget<String> nonReadableSourceTarget = mock(SourceTarget.class);
> +    when(pcollection.getPType()).thenReturn(Avros.strings());
> +    doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
> +    when(pcollection.getMaterializedAt()).thenReturn(null);
> +
> +    pipeline.getMaterializeSourceTarget(pcollection);
> +  }
> +
> +}
> diff --git src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
> new file mode 100644
> index 0000000..97e0c63
> --- /dev/null
> +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
> @@ -0,0 +1,102 @@
> +package com.cloudera.crunch.lib.join;
> +
> +import static org.junit.Assert.assertEquals;
> +import static org.junit.Assert.assertTrue;
> +
> +import java.io.IOException;
> +import java.util.Collections;
> +import java.util.List;
> +
> +import org.junit.Test;
> +
> +import com.cloudera.crunch.FilterFn;
> +import com.cloudera.crunch.MapFn;
> +import com.cloudera.crunch.PTable;
> +import com.cloudera.crunch.Pair;
> +import com.cloudera.crunch.Pipeline;
> +import com.cloudera.crunch.impl.mem.MemPipeline;
> +import com.cloudera.crunch.impl.mr.MRPipeline;
> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
> +import com.cloudera.crunch.test.FileHelper;
> +import com.cloudera.crunch.types.writable.Writables;
> +import com.google.common.collect.Lists;
> +
> +public class MapsideJoinTest {
> +
> +  private static class LineSplitter extends MapFn<String,
> Pair<Integer, String>> {
> +
> +    @Override
> +    public Pair<Integer, String> map(String input) {
> +      String[] fields = input.split("\\|");
> +      return Pair.of(Integer.parseInt(fields[0]), fields[1]);
> +    }
> +
> +  }
> +
> +  private static class NegativeFilter extends FilterFn<Pair<Integer, String>>
{
> +
> +    @Override
> +    public boolean accept(Pair<Integer, String> input) {
> +      return false;
> +    }
> +
> +  }
> +
> +  @Test(expected = CrunchRuntimeException.class)
> +  public void testNonMapReducePipeline() {
> +    runMapsideJoin(MemPipeline.getInstance());
> +  }
> +
> +  @Test
> +  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
> +    MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class);
> +    PTable<Integer, String> customerTable = readTable(pipeline,
> "customers.txt");
> +    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
> +
> +    PTable<Integer, String> filteredOrderTable =
> orderTable.parallelDo(new NegativeFilter(),
> +        orderTable.getPTableType());
> +
> +    PTable<Integer, Pair<String, String>> joined =
> MapsideJoin.join(customerTable,
> +        filteredOrderTable);
> +
> +    List<Pair<Integer, Pair<String, String>>> materializedJoin =
> Lists.newArrayList(joined
> +        .materialize());
> +
> +    assertTrue(materializedJoin.isEmpty());
> +
> +  }
> +
> +  @Test
> +  public void testMapsideJoin() throws IOException {
> +    runMapsideJoin(new MRPipeline(MapsideJoinTest.class));
> +  }
> +
> +  private void runMapsideJoin(Pipeline pipeline) {
> +    PTable<Integer, String> customerTable = readTable(pipeline,
> "customers.txt");
> +    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
> +
> +    PTable<Integer, Pair<String, String>> joined =
> MapsideJoin.join(customerTable, orderTable);
> +
> +    List<Pair<Integer, Pair<String, String>>> expectedJoinResult =
> Lists.newArrayList();
> +    expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn flakes")));
> +    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet paper")));
> +    expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
> plunger")));
> +    expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else",
> "Toilet brush")));
> +
> +    List<Pair<Integer, Pair<String, String>>> joinedResultList =
> Lists.newArrayList(joined
> +        .materialize());
> +    Collections.sort(joinedResultList);
> +
> +    assertEquals(expectedJoinResult, joinedResultList);
> +  }
> +
> +  private static PTable<Integer, String> readTable(Pipeline pipeline,
> String filename) {
> +    try {
> +      return pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
> +          new LineSplitter(), Writables.tableOf(Writables.ints(),
> Writables.strings()));
> +    } catch (IOException e) {
> +      throw new RuntimeException(e);
> +    }
> +  }
> +
> +}
> diff --git src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> index 74e2ad3..c6a0b46 100644
> --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type;
>  import org.apache.avro.generic.GenericData;
>  import org.apache.avro.util.Utf8;
>  import org.apache.hadoop.io.LongWritable;
> +import org.junit.Ignore;
>  import org.junit.Test;
>
>  import com.cloudera.crunch.Pair;
> @@ -103,6 +104,7 @@ public class AvrosTest {
>    }
>
>    @Test
> +  @Ignore("This test creates an invalid schema that causes
> Schema#toString to fail")
>    public void testNestedTables() throws Exception {
>         PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
>         PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll,
> Avros.strings());
> diff --git src/test/resources/customers.txt src/test/resources/customers.txt
> new file mode 100644
> index 0000000..98f3f3d
> --- /dev/null
> +++ src/test/resources/customers.txt
> @@ -0,0 +1,4 @@
> +111|John Doe
> +222|Jane Doe
> +333|Someone Else
> +444|Has No Orders
> \ No newline at end of file
> diff --git src/test/resources/orders.txt src/test/resources/orders.txt
> new file mode 100644
> index 0000000..2f1383f
> --- /dev/null
> +++ src/test/resources/orders.txt
> @@ -0,0 +1,4 @@
> +222|Toilet plunger
> +333|Toilet brush
> +222|Toilet paper
> +111|Corn flakes
> \ No newline at end of file
>
>
>
> On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov
> <christian.tzolov@gmail.com> wrote:
>> Hi Gabriel,
>>
>> Seems like the attachment is missing.
>>
>> Cheers,
>> Chris
>>
>> On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid <gabriel.reid@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> Attached (hopefully) is a patch for an initial implementation of map
>>> side joins. It's currently implemented as a static method in a class
>>> called MapsideJoin, with the same interface as the existing Join class
>>> (with only inner joins being implemented for now). The way it works is
>>> that the right-side PTable of the join is put in the distributed cache
>>> and then read by the join function at runtime.
>>>
>>> There's one spot that I can see for a potentially interesting
>>> optimization -- MRPipeline#run is called once for each map side join
>>> that is set up, but if the setup of the joins was done within
>>> MRPipeline, then we could set up multiple map side joins in parallel
>>> with a single call to MRPipeline#run. OTOH, a whole bunch of map side
>>> joins in parallel probably isn't that common of an operation.
>>>
>>> If anyone feels like taking a look at the patch, any feedback is
>>> appreciated. If nobody sees something that needs serious changes in
>>> the patch, I'll commit it.
>>>
>>> - Gabriel
>>>
>>>
>>> On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid <gabriel.reid@gmail.com>
>>> wrote:
>>> > Replying to all...
>>> >
>>> > On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills <jwills@cloudera.com>
wrote:
>>> >>
>>> >> So there's a philosophical issue here: should Crunch ever make
>>> >> decisions about how to do something itself based on its estimates of
>>> >> the size of the data sets, or should it always do exactly what the
>>> >> developer indicates?
>>> >>
>>> >> I can make a case either way, but I think that no matter what, we
>>> >> would want to have explicit functions for performing a join that reads
>>> >> one data set into memory, so I think we can proceed w/the
>>> >> implementation while folks weigh in on what their preferences are for
>>> >> the default join() behavior (e.g., just do a reduce-side join, or try
>>> >> to figure out the best join given information about the input data and
>>> >> some configuration parameters.)
>>> >>
>>> >
>>> > I definitely agree on needing to have an explicit way to invoke one or
>>> > the other -- and in general I don't like having magic behind the
>>> > scenes to decide on behaviour (especially considering Crunch is
>>> > generally intended to be closer to the metal than Pig and Hive). I'm
>>> > not sure if the runtime decision is something specific to some of my
>>> > use cases or if it could be useful to a wider audience.
>>> >
>>> > The ability to dynamically decide at runtime whether a map side join
>>> > should be used can also easily be tacked on outside of Crunch, and
>>> > won't impact the underlying implementation (as you pointed out), so I
>>> > definitely also agree on focusing on the underlying implementation
>>> > first, and we can worry about the options used for exposing it later
>>> > on.
>>> >
>>> > - Gabriel
>>>

Mime
View raw message