incubator-crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: map side ("replicated") joins in Crunch
Date Fri, 06 Jul 2012 17:53:01 GMT
On Fri, Jul 6, 2012 at 10:09 AM, Gabriel Reid <gabriel.reid@gmail.com>wrote:

> It definitely has a significant performance gain when dealing with big
> records that result in a slow shuffle.
>
> By the way, I discovered (confirmed) an issue with the normal join
> implementation, where objects are held on to without using a deep copy. I
> was thinking of fixing that this evening -- are you planning on doing the
> big package rename today?
>

It's a python script that does most of the renaming work, so I can do it
this weekend. It'll be a pleasant surprise when everyone comes into work on
Monday AM. ;)


>
> On 06 Jul 2012, at 18:57, Josh Wills <josh.wills@gmail.com> wrote:
>
> > Not a problem man-- can't wait to play with it this weekend!
> >
> > On Fri, Jul 6, 2012 at 9:50 AM, Gabriel Reid <gabriel.reid@gmail.com>
> wrote:
> >> Hi Josh,
> >>
> >> I've just committed the map side joins, after doing some more
> >> extensive testing on it today. Unfortunately, I forgot to squash the
> >> multiple commits into a single one (as I was planning on doing), but
> >> it's all in there and working.
> >>
> >> - Gabriel
> >>
> >> On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid <gabriel.reid@gmail.com>
> wrote:
> >>> Thanks for making the JIRA issue.
> >>>
> >>> I was going to do some more testing an a "real" cluster later on
> today, and as long as that all checks out then this will be ready to go in
> as far as I'm concerned.
> >>>
> >>> I don't see any reason to hold back on the big renaming for this patch
> -- if you're ready to do the package renaming, please go ahead.
> >>>
> >>> As long as my testing later today checks out and there aren't any
> glaring issues from anyone who has tried the patch out, I'll probably be
> checking the patch in later today (so it'll probably be in before you do
> the renaming anyhow).
> >>>
> >>> - Gabriel
> >>>
> >>>
> >>> On Friday 6 July 2012 at 00:11, Josh Wills wrote:
> >>>
> >>>> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this.
> >>>>
> >>>> Gabriel, is your feeling that this is ready to go in? I'm debating
> whether
> >>>> or not to check this in before/after I do the massive com.cloudera ->
> >>>> org.apache renaming. What do you think?
> >>>>
> >>>> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid <gabriel.reid@gmail.com(mailto:
> gabriel.reid@gmail.com)> wrote:
> >>>>
> >>>>> Hi Joe,
> >>>>>
> >>>>> Looking forward to hearing your feedback! This is still just a patch
> and
> >>>>> not committed yet, and there's still definitely room for help (i.e.
> ideas
> >>>>> on how to improve it, or do it totally differently), so certainly
> let me
> >>>>> know if you've got some ideas.
> >>>>>
> >>>>> - Gabriel
> >>>>>
> >>>>>
> >>>>> On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote:
> >>>>>
> >>>>>> Awesome! Will give this a try soon... wish I could have helped with
> this
> >>>>> one...
> >>>>>>
> >>>>>> On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid <
> gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)(mailto:
> >>>>> gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com))> wrote:
> >>>>>>> Thanks for pointing that out Chris. I'm guessing the mailing list
> is
> >>>>>>> stripping out attachments(?). Once the JIRA is up and running then
> >>>>>>> that will be taken care of I guess.
> >>>>>>>
> >>>>>>> The mime type on the last attempt was application/octet-stream, so
> >>>>>>> I've renamed this to a .txt file to try to ensure that it'll get a
> >>>>>>> text/plain mime type (although I don't know if that'll make a
> >>>>>>> difference). I've also pasted it inline below, hopefully one of
> those
> >>>>>>> solutions works.
> >>>>>>>
> >>>>>>> - Gabriel
> >>>>>>>
> >>>>>>>
> >>>>>>> diff --git
> src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> >>>>>>> src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> >>>>>>> index 420e8dc..c8ba596 100644
> >>>>>>> --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> >>>>>>> +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
> >>>>>>> @@ -59,9 +59,9 @@ import com.google.common.collect.Sets;
> >>>>>>> public class MRPipeline implements Pipeline {
> >>>>>>>
> >>>>>>> private static final Log LOG = LogFactory.getLog(MRPipeline.class);
> >>>>>>> -
> >>>>>>> +
> >>>>>>> private static final Random RANDOM = new Random();
> >>>>>>> -
> >>>>>>> +
> >>>>>>> private final Class<?> jarClass;
> >>>>>>> private final String name;
> >>>>>>> private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
> >>>>>>> @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline {
> >>>>>>> public MRPipeline(Class<?> jarClass) throws IOException {
> >>>>>>> this(jarClass, new Configuration());
> >>>>>>> }
> >>>>>>> -
> >>>>>>> - public MRPipeline(Class<?> jarClass, String name){
> >>>>>>> +
> >>>>>>> + public MRPipeline(Class<?> jarClass, String name) {
> >>>>>>> this(jarClass, name, new Configuration());
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> public MRPipeline(Class<?> jarClass, Configuration conf) {
> >>>>>>> - this(jarClass, jarClass.getName(), conf);
> >>>>>>> + this(jarClass, jarClass.getName(), conf);
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> public MRPipeline(Class<?> jarClass, String name, Configuration
> conf) {
> >>>>>>> this.jarClass = jarClass;
> >>>>>>> this.name (http://this.name) = name;
> >>>>>>> @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline {
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public void setConfiguration(Configuration conf) {
> >>>>>>> - this.conf = conf;
> >>>>>>> + this.conf = conf;
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> @Override
> >>>>>>> public PipelineResult run() {
> >>>>>>> MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
> >>>>>>> @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline {
> >>>>>>> boolean materialized = false;
> >>>>>>> for (Target t : outputTargets.get(c)) {
> >>>>>>> if (!materialized && t instanceof Source) {
> >>>>>>> - c.materializeAt((SourceTarget) t);
> >>>>>>> - materialized = true;
> >>>>>>> + c.materializeAt((SourceTarget) t);
> >>>>>>> + materialized = true;
> >>>>>>> }
> >>>>>>> }
> >>>>>>> }
> >>>>>>> @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline {
> >>>>>>> cleanup();
> >>>>>>> return res;
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> public <S> PCollection<S> read(Source<S> source) {
> >>>>>>> return new InputCollection<S>(source, this);
> >>>>>>> }
> >>>>>>> @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline
> {
> >>>>>>> @SuppressWarnings("unchecked")
> >>>>>>> public void write(PCollection<?> pcollection, Target target) {
> >>>>>>> if (pcollection instanceof PGroupedTableImpl) {
> >>>>>>> - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
> >>>>>>> + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
> >>>>>>> } else if (pcollection instanceof UnionCollection || pcollection
> >>>>>>> instanceof UnionTable) {
> >>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper",
> >>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
> >>>>>>> + pcollection = pcollection.parallelDo("UnionCollectionWrapper",
> >>>>>>> + (MapFn) IdentityFn.<Object> getInstance(),
> pcollection.getPType());
> >>>>>>> }
> >>>>>>> addOutput((PCollectionImpl<?>) pcollection, target);
> >>>>>>> }
> >>>>>>>
> >>>>>>> private void addOutput(PCollectionImpl<?> impl, Target target) {
> >>>>>>> if (!outputTargets.containsKey(impl)) {
> >>>>>>> - outputTargets.put(impl, Sets.<Target>newHashSet());
> >>>>>>> + outputTargets.put(impl, Sets.<Target> newHashSet());
> >>>>>>> }
> >>>>>>> outputTargets.get(impl).add(target);
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> @Override
> >>>>>>> public <T> Iterable<T> materialize(PCollection<T> pcollection) {
> >>>>>>> -
> >>>>>>> - if (pcollection instanceof UnionCollection) {
> >>>>>>> - pcollection = pcollection.parallelDo("UnionCollectionWrapper",
> >>>>>>> - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
> >>>>>>> - }
> >>>>>>> - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
> >>>>>>> +
> >>>>>>> + PCollectionImpl<T> pcollectionImpl =
> toPcollectionImpl(pcollection);
> >>>>>>> + ReadableSourceTarget<T> srcTarget =
> >>>>>>> getMaterializeSourceTarget(pcollectionImpl);
> >>>>>>> +
> >>>>>>> + MaterializableIterable<T> c = new MaterializableIterable<T>(this,
> >>>>>>> srcTarget);
> >>>>>>> + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
> >>>>>>> + outputTargetsToMaterialize.put(pcollectionImpl, c);
> >>>>>>> + }
> >>>>>>> + return c;
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + /**
> >>>>>>> + * Retrieve a ReadableSourceTarget that provides access to the
> >>>>>>
> >>>>>
> >>>>>
> >>>>> contents of a
> >>>>>>> + * {@link PCollection}. This is primarily intended as a helper
> method
> >>>>>>
> >>>>>
> >>>>>
> >>>>> to
> >>>>>>> + * {@link #materialize(PCollection)}. The underlying data of the
> >>>>>>> + * ReadableSourceTarget may not be actually present until the
> >>>>>>> pipeline is run.
> >>>>>>> + *
> >>>>>>> + * @param pcollection
> >>>>>>> + * The collection for which the ReadableSourceTarget is to be
> >>>>>>> + * retrieved
> >>>>>>> + * @return The ReadableSourceTarget
> >>>>>>> + * @throws IllegalArgumentException
> >>>>>>> + * If no ReadableSourceTarget can be retrieved for the given
> >>>>>>> + * PCollection
> >>>>>>> + */
> >>>>>>> + public <T> ReadableSourceTarget<T>
> >>>>>>> getMaterializeSourceTarget(PCollection<T> pcollection) {
> >>>>>>> + PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
> >>>>>>> SourceTarget<T> matTarget = impl.getMaterializedAt();
> >>>>>>> if (matTarget != null && matTarget instanceof
> ReadableSourceTarget) {
> >>>>>>> - return new MaterializableIterable<T>(this,
> >>>>>>> (ReadableSourceTarget<T>) matTarget);
> >>>>>>> + return (ReadableSourceTarget<T>) matTarget;
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + ReadableSourceTarget<T> srcTarget = null;
> >>>>>>> + if (outputTargets.containsKey(pcollection)) {
> >>>>>>> + for (Target target : outputTargets.get(impl)) {
> >>>>>>> + if (target instanceof ReadableSourceTarget) {
> >>>>>>> + srcTarget = (ReadableSourceTarget<T>) target;
> >>>>>>> + break;
> >>>>>>> + }
> >>>>>>> + }
> >>>>>>> }
> >>>>>>> -
> >>>>>>> - ReadableSourceTarget<T> srcTarget = null;
> >>>>>>> - if (outputTargets.containsKey(pcollection)) {
> >>>>>>> - for (Target target : outputTargets.get(impl)) {
> >>>>>>> - if (target instanceof ReadableSourceTarget) {
> >>>>>>> - srcTarget = (ReadableSourceTarget) target;
> >>>>>>> - break;
> >>>>>>> - }
> >>>>>>> - }
> >>>>>>> - }
> >>>>>>> -
> >>>>>>> - if (srcTarget == null) {
> >>>>>>> - SourceTarget<T> st =
> >>>>>>
> >>>>>
> >>>>>
> >>>>> createIntermediateOutput(pcollection.getPType());
> >>>>>>> - if (!(st instanceof ReadableSourceTarget)) {
> >>>>>>> - throw new IllegalArgumentException("The PType for the given
> >>>>>>> PCollection is not readable"
> >>>>>>> - + " and cannot be materialized");
> >>>>>>> - } else {
> >>>>>>> - srcTarget = (ReadableSourceTarget) st;
> >>>>>>> - addOutput(impl, srcTarget);
> >>>>>>> - }
> >>>>>>> - }
> >>>>>>> -
> >>>>>>> - MaterializableIterable<T> c = new MaterializableIterable<T>(this,
> >>>>>>
> >>>>>
> >>>>>
> >>>>> srcTarget);
> >>>>>>> - outputTargetsToMaterialize.put(impl, c);
> >>>>>>> - return c;
> >>>>>>> +
> >>>>>>> + if (srcTarget == null) {
> >>>>>>> + SourceTarget<T> st =
> >>>>>>
> >>>>>
> >>>>>
> >>>>> createIntermediateOutput(pcollection.getPType());
> >>>>>>> + if (!(st instanceof ReadableSourceTarget)) {
> >>>>>>> + throw new IllegalArgumentException("The PType for the given
> >>>>>>> PCollection is not readable"
> >>>>>>> + + " and cannot be materialized");
> >>>>>>> + } else {
> >>>>>>> + srcTarget = (ReadableSourceTarget<T>) st;
> >>>>>>> + addOutput(impl, srcTarget);
> >>>>>>> + }
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + return srcTarget;
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + /**
> >>>>>>> + * Safely cast a PCollection into a PCollectionImpl, including
> >>>>>>> handling the case of UnionCollections.
> >>>>>>> + * @param pcollection The PCollection to be cast/transformed
> >>>>>>> + * @return The PCollectionImpl representation
> >>>>>>> + */
> >>>>>>> + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T>
> >>>>>>> pcollection) {
> >>>>>>> + PCollectionImpl<T> pcollectionImpl = null;
> >>>>>>> + if (pcollection instanceof UnionCollection) {
> >>>>>>> + pcollectionImpl = (PCollectionImpl<T>)
> >>>>>>> pcollection.parallelDo("UnionCollectionWrapper",
> >>>>>>> + (MapFn) IdentityFn.<Object> getInstance(),
> pcollection.getPType());
> >>>>>>> + } else {
> >>>>>>> + pcollectionImpl = (PCollectionImpl<T>) pcollection;
> >>>>>>> + }
> >>>>>>> + return pcollectionImpl;
> >>>>>>> }
> >>>>>>>
> >>>>>>> public <T> SourceTarget<T> createIntermediateOutput(PType<T>
> ptype) {
> >>>>>>> - return ptype.getDefaultFileSource(createTempPath());
> >>>>>>> + return ptype.getDefaultFileSource(createTempPath());
> >>>>>>> }
> >>>>>>>
> >>>>>>> public Path createTempPath() {
> >>>>>>> tempFileIndex++;
> >>>>>>> return new Path(tempDirectory, "p" + tempFileIndex);
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> private static Path createTempDirectory(Configuration conf) {
> >>>>>>> Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
> >>>>>>> - try {
> >>>>>>> - FileSystem.get(conf).mkdirs(dir);
> >>>>>>> - } catch (IOException e) {
> >>>>>>> - LOG.error("Exception creating job output directory", e);
> >>>>>>> - throw new RuntimeException(e);
> >>>>>>> - }
> >>>>>>> + try {
> >>>>>>> + FileSystem.get(conf).mkdirs(dir);
> >>>>>>> + } catch (IOException e) {
> >>>>>>> + LOG.error("Exception creating job output directory", e);
> >>>>>>> + throw new RuntimeException(e);
> >>>>>>> + }
> >>>>>>> return dir;
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> @Override
> >>>>>>> public <T> void writeTextFile(PCollection<T> pcollection, String
> >>>>>>
> >>>>>
> >>>>>
> >>>>> pathName) {
> >>>>>>> // Ensure that this is a writable pcollection instance.
> >>>>>>> - pcollection = pcollection.parallelDo("asText",
> >>>>>>
> >>>>>
> >>>>>
> >>>>> IdentityFn.<T>getInstance(),
> >>>>>>> - WritableTypeFamily.getInstance().as(pcollection.getPType()));
> >>>>>>> + pcollection = pcollection.parallelDo("asText", IdentityFn.<T>
> >>>>>>> getInstance(), WritableTypeFamily
> >>>>>>> + .getInstance().as(pcollection.getPType()));
> >>>>>>> write(pcollection, At.textFile(pathName));
> >>>>>>> }
> >>>>>>>
> >>>>>>> @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline {
> >>>>>>> LOG.info (http://LOG.info)("Exception during cleanup", e);
> >>>>>>> }
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> public int getNextAnonymousStageId() {
> >>>>>>> return nextAnonymousStageId++;
> >>>>>>> }
> >>>>>>> @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline {
> >>>>>>> public void enableDebug() {
> >>>>>>> // Turn on Crunch runtime error catching.
> >>>>>>> getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
> >>>>>>> -
> >>>>>>> +
> >>>>>>> // Write Hadoop's WARN logs to the console.
> >>>>>>> Logger crunchInfoLogger =
> LogManager.getLogger("com.cloudera.crunch");
> >>>>>>> Appender console = crunchInfoLogger.getAppender("A");
> >>>>>>> @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline {
> >>>>>>> LOG.warn("Could not find console appender named 'A' for writing
> >>>>>>> Hadoop warning logs");
> >>>>>>> }
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> @Override
> >>>>>>> public String getName() {
> >>>>>>> - return name;
> >>>>>>> + return name;
> >>>>>>> }
> >>>>>>> }
> >>>>>>> diff --git
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> >>>>>>> index d41a52e..68ef054 100644
> >>>>>>> ---
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> >>>>>>> +++
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
> >>>>>>> @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends
> >>>>>>> RuntimeException {
> >>>>>>> super(e);
> >>>>>>> }
> >>>>>>>
> >>>>>>> + public CrunchRuntimeException(String msg, Exception e) {
> >>>>>>> + super(msg, e);
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> public boolean wasLogged() {
> >>>>>>> return logged;
> >>>>>>> }
> >>>>>>> diff --git
> >>>>>>
> >>>>>
> >>>>>
> >>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> >>>>>>> src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> >>>>>>> index 1122d62..4debfeb 100644
> >>>>>>> --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> >>>>>>> +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
> >>>>>>> @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends
> >>>>>>> FileSourceImpl<T> implements ReadableSour
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public Iterable<T> read(Configuration conf) throws IOException {
> >>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path,
> >>>>>>> new AvroFileReaderFactory<T>(
> >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf);
> >>>>>>> + return CompositePathIterable.create(fs, path, new
> >>>>>>
> >>>>>
> >>>>>
> >>>>> AvroFileReaderFactory<T>(
> >>>>>>> (AvroType<T>) ptype, conf));
> >>>>>>> }
> >>>>>>> }
> >>>>>>> diff --git
> src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> >>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> >>>>>>> index 24dec2d..462ef93 100644
> >>>>>>> --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> >>>>>>> +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
> >>>>>>> @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource;
> >>>>>>> import com.cloudera.crunch.io.impl.FileSourceImpl;
> >>>>>>> import com.cloudera.crunch.types.PType;
> >>>>>>>
> >>>>>>> -public class SeqFileSource<T> extends FileSourceImpl<T> implements
> >>>>>>> - ReadableSource<T> {
> >>>>>>> +public class SeqFileSource<T> extends FileSourceImpl<T> implements
> >>>>>>> ReadableSource<T> {
> >>>>>>>
> >>>>>>> public SeqFileSource(Path path, PType<T> ptype) {
> >>>>>>> - super(path, ptype, SequenceFileInputFormat.class);
> >>>>>>> + super(path, ptype, SequenceFileInputFormat.class);
> >>>>>>> }
> >>>>>>> -
> >>>>>>> +
> >>>>>>> @Override
> >>>>>>> public Iterable<T> read(Configuration conf) throws IOException {
> >>>>>>> - FileSystem fs = FileSystem.get(conf);
> >>>>>>> - return CompositePathIterable.create(fs, path,
> >>>>>>> - new SeqFileReaderFactory<T>(ptype, conf));
> >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf);
> >>>>>>> + return CompositePathIterable.create(fs, path, new
> >>>>>>> SeqFileReaderFactory<T>(ptype, conf));
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> diff --git
> >>>>>>
> >>>>>
> >>>>>
> >>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> >>>>>>> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> >>>>>>> index 69ca12b..4db6658 100644
> >>>>>>> ---
> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> >>>>>>> +++
> src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
> >>>>>>> @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends
> >>>>>>> FileTableSourceImpl<K, V> implemen
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public Iterable<Pair<K, V>> read(Configuration conf) throws
> >>>>>>
> >>>>>
> >>>>>
> >>>>> IOException {
> >>>>>>> - FileSystem fs = FileSystem.get(conf);
> >>>>>>> + FileSystem fs = FileSystem.get(path.toUri(), conf);
> >>>>>>> return CompositePathIterable.create(fs, path,
> >>>>>>> new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype,
> conf));
> >>>>>>> }
> >>>>>>> diff --git
> >>>>>>
> >>>>>
> >>>>>
> >>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> >>>>>>> src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> >>>>>>> index a876843..e0dbe68 100644
> >>>>>>> --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> >>>>>>> +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
> >>>>>>> @@ -67,7 +67,7 @@ public class TextFileSource<T> extends
> >>>>>>> FileSourceImpl<T> implements
> >>>>>>>
> >>>>>>> @Override
> >>>>>>> public Iterable<T> read(Configuration conf) throws IOException {
> >>>>>>> - return CompositePathIterable.create(FileSystem.get(conf), path,
> >>>>>>> - new TextFileReaderFactory<T>(ptype, conf));
> >>>>>>> + return CompositePathIterable.create(FileSystem.get(path.toUri(),
> >>>>>>> conf), path,
> >>>>>>> + new TextFileReaderFactory<T>(ptype, conf));
> >>>>>>> }
> >>>>>>> }
> >>>>>>> diff --git
> src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
> >>>>>>> src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
> >>>>>>> new file mode 100644
> >>>>>>> index 0000000..8072e07
> >>>>>>> --- /dev/null
> >>>>>>> +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
> >>>>>>> @@ -0,0 +1,143 @@
> >>>>>>> +package com.cloudera.crunch.lib.join;
> >>>>>>> +
> >>>>>>> +import java.io.IOException;
> >>>>>>> +
> >>>>>>> +import org.apache.hadoop.filecache.DistributedCache;
> >>>>>>> +import org.apache.hadoop.fs.FileSystem;
> >>>>>>> +import org.apache.hadoop.fs.Path;
> >>>>>>> +
> >>>>>>> +import com.cloudera.crunch.DoFn;
> >>>>>>> +import com.cloudera.crunch.Emitter;
> >>>>>>> +import com.cloudera.crunch.PTable;
> >>>>>>> +import com.cloudera.crunch.Pair;
> >>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline;
> >>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
> >>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget;
> >>>>>>> +import com.cloudera.crunch.io.impl.SourcePathTargetImpl;
> >>>>>>> +import com.cloudera.crunch.types.PType;
> >>>>>>> +import com.cloudera.crunch.types.PTypeFamily;
> >>>>>>> +import com.google.common.collect.ArrayListMultimap;
> >>>>>>> +import com.google.common.collect.Multimap;
> >>>>>>> +
> >>>>>>> +/**
> >>>>>>> + * Utility for doing map side joins on a common key between two
> >>>>>>> {@link PTable}s.
> >>>>>>> + * <p>
> >>>>>>> + * A map side join is an optimized join which doesn't use a
> reducer;
> >>>>>>
> >>>>>
> >>>>>
> >>>>> instead,
> >>>>>>> + * the right side of the join is loaded into memory and the join
> is
> >>>>>>> performed in
> >>>>>>> + * a mapper. This style of join has the important implication that
> >>>>>>> the output of
> >>>>>>> + * the join is not sorted, which is the case with a conventional
> >>>>>>> (reducer-based)
> >>>>>>> + * join.
> >>>>>>> + * <p>
> >>>>>>> + * <b>Note:</b>This utility is only supported when running with a
> >>>>>>> + * {@link MRPipeline} as the pipeline.
> >>>>>>> + */
> >>>>>>> +public class MapsideJoin {
> >>>>>>> +
> >>>>>>> + /**
> >>>>>>> + * Join two tables using a map side join. The right-side table
> will
> >>>>>>
> >>>>>
> >>>>>
> >>>>> be loaded
> >>>>>>> + * fully in memory, so this method should only be used if the
> right
> >>>>>>
> >>>>>
> >>>>>
> >>>>> side
> >>>>>>> + * table's contents can fit in the memory allocated to mappers.
> The
> >>>>>>
> >>>>>
> >>>>>
> >>>>> join
> >>>>>>> + * performed by this method is an inner join.
> >>>>>>> + *
> >>>>>>> + * @param left
> >>>>>>> + * The left-side table of the join
> >>>>>>> + * @param right
> >>>>>>> + * The right-side table of the join, whose contents will be fully
> >>>>>>> + * read into memory
> >>>>>>> + * @return A table keyed on the join key, containing pairs of
> joined
> >>>>>>
> >>>>>
> >>>>>
> >>>>> values
> >>>>>>> + */
> >>>>>>> + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
> >>>>>>> left, PTable<K, V> right) {
> >>>>>>> +
> >>>>>>> + if (!(right.getPipeline() instanceof MRPipeline)) {
> >>>>>>> + throw new CrunchRuntimeException("Map-side join is only
> >>>>>>> supported within a MapReduce context");
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + MRPipeline pipeline = (MRPipeline) right.getPipeline();
> >>>>>>> + pipeline.materialize(right);
> >>>>>>> +
> >>>>>>> + // TODO Move necessary logic to MRPipeline so that we can
> >>>>>>
> >>>>>
> >>>>>
> >>>>> theoretically
> >>>>>>> + // optimize his by running the setup of multiple map-side joins
> >>>>>>> concurrently
> >>>>>>> + pipeline.run();
> >>>>>>> +
> >>>>>>> + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
> >>>>>>> + .getMaterializeSourceTarget(right);
> >>>>>>> + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
> >>>>>>> + throw new CrunchRuntimeException("Right-side contents can't be
> >>>>>>> read from a path");
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + // Suppress warnings because we've just checked this cast via
> >>>>>>
> >>>>>
> >>>>>
> >>>>> instanceof
> >>>>>>> + @SuppressWarnings("unchecked")
> >>>>>>> + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget =
> >>>>>>> (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
> >>>>>>> +
> >>>>>>> + Path path = sourcePathTarget.getPath();
> >>>>>>> + DistributedCache.addCacheFile(path.toUri(),
> >>>>>>
> >>>>>
> >>>>>
> >>>>> pipeline.getConfiguration());
> >>>>>>> +
> >>>>>>> + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U,
> >>>>>>> V>(path.toString(),
> >>>>>>> + right.getPType());
> >>>>>>> + PTypeFamily typeFamily = left.getTypeFamily();
> >>>>>>> + return left.parallelDo(
> >>>>>>> + "mapjoin",
> >>>>>>> + mapJoinDoFn,
> >>>>>>> + typeFamily.tableOf(left.getKeyType(),
> >>>>>>> + typeFamily.pairs(left.getValueType(), right.getValueType())));
> >>>>>>> +
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>,
> >>>>>>> Pair<K, Pair<U, V>>> {
> >>>>>>> +
> >>>>>>> + private String inputPath;
> >>>>>>> + private PType<Pair<K, V>> ptype;
> >>>>>>> + private Multimap<K, V> joinMap;
> >>>>>>> +
> >>>>>>> + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>>
> ptype) {
> >>>>>>> + this.inputPath = inputPath;
> >>>>>>> + this.ptype = ptype;
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + private Path getCacheFilePath() {
> >>>>>>> + try {
> >>>>>>> + for (Path localPath :
> >>>>>>> DistributedCache.getLocalCacheFiles(getConfiguration())) {
> >>>>>>> + if (localPath.toString().endsWith(inputPath)) {
> >>>>>>> + return
> >>>>>>> localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
> >>>>>>> +
> >>>>>>> + }
> >>>>>>> + }
> >>>>>>> + } catch (IOException e) {
> >>>>>>> + throw new CrunchRuntimeException(e);
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + throw new CrunchRuntimeException("Can't find local cache file
> >>>>>>> for '" + inputPath + "'");
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Override
> >>>>>>> + public void initialize() {
> >>>>>>> + super.initialize();
> >>>>>>> +
> >>>>>>> + ReadableSourceTarget<Pair<K, V>> sourceTarget =
> >>>>>>> (ReadableSourceTarget<Pair<K, V>>) ptype
> >>>>>>> + .getDefaultFileSource(getCacheFilePath());
> >>>>>>> + Iterable<Pair<K, V>> iterable = null;
> >>>>>>> + try {
> >>>>>>> + iterable = sourceTarget.read(getConfiguration());
> >>>>>>> + } catch (IOException e) {
> >>>>>>> + throw new CrunchRuntimeException("Error reading right-side of
> >>>>>>> map side join: ", e);
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + joinMap = ArrayListMultimap.create();
> >>>>>>> + for (Pair<K, V> joinPair : iterable) {
> >>>>>>> + joinMap.put(joinPair.first(), joinPair.second());
> >>>>>>> + }
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Override
> >>>>>>> + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U,
> >>>>>>> V>>> emitter) {
> >>>>>>> + K key = input.first();
> >>>>>>> + U value = input.second();
> >>>>>>> + for (V joinValue : joinMap.get(key)) {
> >>>>>>> + Pair<U, V> valuePair = Pair.of(value, joinValue);
> >>>>>>> + emitter.emit(Pair.of(key, valuePair));
> >>>>>>> + }
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> +}
> >>>>>>> diff --git src/main/java/com/cloudera/crunch/types/PType.java
> >>>>>>> src/main/java/com/cloudera/crunch/types/PType.java
> >>>>>>> index af4ef1b..ae480aa 100644
> >>>>>>> --- src/main/java/com/cloudera/crunch/types/PType.java
> >>>>>>> +++ src/main/java/com/cloudera/crunch/types/PType.java
> >>>>>>> @@ -15,6 +15,7 @@
> >>>>>>>
> >>>>>>> package com.cloudera.crunch.types;
> >>>>>>>
> >>>>>>> +import java.io.Serializable;
> >>>>>>> import java.util.List;
> >>>>>>>
> >>>>>>> import org.apache.hadoop.fs.Path;
> >>>>>>> @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget;
> >>>>>>> * {@code PCollection}.
> >>>>>>> *
> >>>>>>> */
> >>>>>>> -public interface PType<T> {
> >>>>>>> +public interface PType<T> extends Serializable {
> >>>>>>> /**
> >>>>>>> * Returns the Java type represented by this {@code PType}.
> >>>>>>> */
> >>>>>>> diff --git
> src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> >>>>>>> src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> >>>>>>> index 3db00c0..29af9fb 100644
> >>>>>>> --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> >>>>>>> +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java
> >>>>>>> @@ -14,6 +14,7 @@
> >>>>>>> */
> >>>>>>> package com.cloudera.crunch.types.avro;
> >>>>>>>
> >>>>>>> +import java.io.Serializable;
> >>>>>>> import java.util.List;
> >>>>>>>
> >>>>>>> import org.apache.avro.Schema;
> >>>>>>> @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> {
> >>>>>>> private static final Converter AVRO_CONVERTER = new
> AvroKeyConverter();
> >>>>>>>
> >>>>>>> private final Class<T> typeClass;
> >>>>>>> - private final Schema schema;
> >>>>>>> + private final String schemaString;
> >>>>>>> + private transient Schema schema;
> >>>>>>> private final MapFn baseInputMapFn;
> >>>>>>> private final MapFn baseOutputMapFn;
> >>>>>>> private final List<PType> subTypes;
> >>>>>>> @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> {
> >>>>>>> MapFn outputMapFn, PType... ptypes) {
> >>>>>>> this.typeClass = typeClass;
> >>>>>>> this.schema = Preconditions.checkNotNull(schema);
> >>>>>>> + this.schemaString = schema.toString();
> >>>>>>> this.baseInputMapFn = inputMapFn;
> >>>>>>> this.baseOutputMapFn = outputMapFn;
> >>>>>>> this.subTypes = ImmutableList.<PType>
> builder().add(ptypes).build();
> >>>>>>> @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> {
> >>>>>>> }
> >>>>>>>
> >>>>>>> public Schema getSchema() {
> >>>>>>> + if (schema == null){
> >>>>>>> + schema = new Schema.Parser().parse(schemaString);
> >>>>>>> + }
> >>>>>>> return schema;
> >>>>>>> }
> >>>>>>>
> >>>>>>> diff --git
> >>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
> >>>>>>> src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
> >>>>>>> new file mode 100644
> >>>>>>> index 0000000..f265460
> >>>>>>> --- /dev/null
> >>>>>>> +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
> >>>>>>> @@ -0,0 +1,60 @@
> >>>>>>> +package com.cloudera.crunch.impl.mr;
> >>>>>>> +
> >>>>>>> +import static org.junit.Assert.assertEquals;
> >>>>>>> +import static org.mockito.Mockito.doReturn;
> >>>>>>> +import static org.mockito.Mockito.mock;
> >>>>>>> +import static org.mockito.Mockito.spy;
> >>>>>>> +import static org.mockito.Mockito.when;
> >>>>>>> +
> >>>>>>> +import java.io.IOException;
> >>>>>>> +
> >>>>>>> +import org.junit.Before;
> >>>>>>> +import org.junit.Test;
> >>>>>>> +
> >>>>>>> +import com.cloudera.crunch.SourceTarget;
> >>>>>>> +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
> >>>>>>> +import com.cloudera.crunch.io.ReadableSourceTarget;
> >>>>>>> +import com.cloudera.crunch.types.avro.Avros;
> >>>>>>> +
> >>>>>>> +public class MRPipelineTest {
> >>>>>>> +
> >>>>>>> + private MRPipeline pipeline;
> >>>>>>> +
> >>>>>>> + @Before
> >>>>>>> + public void setUp() throws IOException {
> >>>>>>> + pipeline = spy(new MRPipeline(MRPipelineTest.class));
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Test
> >>>>>>> + public void testGetMaterializeSourceTarget_AlreadyMaterialized()
> {
> >>>>>>> + PCollectionImpl<String> materializedPcollection =
> >>>>>>> mock(PCollectionImpl.class);
> >>>>>>> + ReadableSourceTarget<String> readableSourceTarget =
> >>>>>>> mock(ReadableSourceTarget.class);
> >>>>>>> +
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
> >>>>>>> +
> >>>>>>> + assertEquals(readableSourceTarget,
> >>>>>>> pipeline.getMaterializeSourceTarget(materializedPcollection));
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Test
> >>>>>>> + public void
> >>>>>>
> >>>>>
> >>>>>
> >>>>> testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
> >>>>>>> +
> >>>>>>> + PCollectionImpl<String> pcollection =
> mock(PCollectionImpl.class);
> >>>>>>> + ReadableSourceTarget<String> readableSourceTarget =
> >>>>>>> mock(ReadableSourceTarget.class);
> >>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings());
> >>>>>>> +
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
> >>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null);
> >>>>>>> +
> >>>>>>> + assertEquals(readableSourceTarget,
> >>>>>>> pipeline.getMaterializeSourceTarget(pcollection));
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Test(expected = IllegalArgumentException.class)
> >>>>>>> + public void
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget()
> >>>>>>> {
> >>>>>>> + PCollectionImpl<String> pcollection =
> mock(PCollectionImpl.class);
> >>>>>>> + SourceTarget<String> nonReadableSourceTarget =
> >>>>>>
> >>>>>
> >>>>>
> >>>>> mock(SourceTarget.class);
> >>>>>>> + when(pcollection.getPType()).thenReturn(Avros.strings());
> >>>>>>> +
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
> >>>>>>> + when(pcollection.getMaterializedAt()).thenReturn(null);
> >>>>>>> +
> >>>>>>> + pipeline.getMaterializeSourceTarget(pcollection);
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> +}
> >>>>>>> diff --git
> >>>>>>
> >>>>>
> >>>>>
> >>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
> >>>>>>> src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
> >>>>>>> new file mode 100644
> >>>>>>> index 0000000..97e0c63
> >>>>>>> --- /dev/null
> >>>>>>> +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
> >>>>>>> @@ -0,0 +1,102 @@
> >>>>>>> +package com.cloudera.crunch.lib.join;
> >>>>>>> +
> >>>>>>> +import static org.junit.Assert.assertEquals;
> >>>>>>> +import static org.junit.Assert.assertTrue;
> >>>>>>> +
> >>>>>>> +import java.io.IOException;
> >>>>>>> +import java.util.Collections;
> >>>>>>> +import java.util.List;
> >>>>>>> +
> >>>>>>> +import org.junit.Test;
> >>>>>>> +
> >>>>>>> +import com.cloudera.crunch.FilterFn;
> >>>>>>> +import com.cloudera.crunch.MapFn;
> >>>>>>> +import com.cloudera.crunch.PTable;
> >>>>>>> +import com.cloudera.crunch.Pair;
> >>>>>>> +import com.cloudera.crunch.Pipeline;
> >>>>>>> +import com.cloudera.crunch.impl.mem.MemPipeline;
> >>>>>>> +import com.cloudera.crunch.impl.mr.MRPipeline;
> >>>>>>> +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
> >>>>>>> +import com.cloudera.crunch.test.FileHelper;
> >>>>>>> +import com.cloudera.crunch.types.writable.Writables;
> >>>>>>> +import com.google.common.collect.Lists;
> >>>>>>> +
> >>>>>>> +public class MapsideJoinTest {
> >>>>>>> +
> >>>>>>> + private static class LineSplitter extends MapFn<String,
> >>>>>>> Pair<Integer, String>> {
> >>>>>>> +
> >>>>>>> + @Override
> >>>>>>> + public Pair<Integer, String> map(String input) {
> >>>>>>> + String[] fields = input.split("\\|");
> >>>>>>> + return Pair.of(Integer.parseInt(fields[0]), fields[1]);
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + private static class NegativeFilter extends
> FilterFn<Pair<Integer,
> >>>>>>
> >>>>>
> >>>>>
> >>>>> String>> {
> >>>>>>> +
> >>>>>>> + @Override
> >>>>>>> + public boolean accept(Pair<Integer, String> input) {
> >>>>>>> + return false;
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Test(expected = CrunchRuntimeException.class)
> >>>>>>> + public void testNonMapReducePipeline() {
> >>>>>>> + runMapsideJoin(MemPipeline.getInstance());
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Test
> >>>>>>> + public void testMapsideJoin_RightSideIsEmpty() throws
> IOException {
> >>>>>>> + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class);
> >>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline,
> >>>>>>> "customers.txt");
> >>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline,
> >>>>>>
> >>>>>
> >>>>>
> >>>>> "orders.txt");
> >>>>>>> +
> >>>>>>> + PTable<Integer, String> filteredOrderTable =
> >>>>>>> orderTable.parallelDo(new NegativeFilter(),
> >>>>>>> + orderTable.getPTableType());
> >>>>>>> +
> >>>>>>> + PTable<Integer, Pair<String, String>> joined =
> >>>>>>> MapsideJoin.join(customerTable,
> >>>>>>> + filteredOrderTable);
> >>>>>>> +
> >>>>>>> + List<Pair<Integer, Pair<String, String>>> materializedJoin =
> >>>>>>> Lists.newArrayList(joined
> >>>>>>> + .materialize());
> >>>>>>> +
> >>>>>>> + assertTrue(materializedJoin.isEmpty());
> >>>>>>> +
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + @Test
> >>>>>>> + public void testMapsideJoin() throws IOException {
> >>>>>>> + runMapsideJoin(new MRPipeline(MapsideJoinTest.class));
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + private void runMapsideJoin(Pipeline pipeline) {
> >>>>>>> + PTable<Integer, String> customerTable = readTable(pipeline,
> >>>>>>> "customers.txt");
> >>>>>>> + PTable<Integer, String> orderTable = readTable(pipeline,
> >>>>>>
> >>>>>
> >>>>>
> >>>>> "orders.txt");
> >>>>>>> +
> >>>>>>> + PTable<Integer, Pair<String, String>> joined =
> >>>>>>> MapsideJoin.join(customerTable, orderTable);
> >>>>>>> +
> >>>>>>> + List<Pair<Integer, Pair<String, String>>> expectedJoinResult =
> >>>>>>> Lists.newArrayList();
> >>>>>>> + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn
> >>>>>>
> >>>>>
> >>>>>
> >>>>> flakes")));
> >>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
> >>>>>>
> >>>>>
> >>>>>
> >>>>> paper")));
> >>>>>>> + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
> >>>>>>> plunger")));
> >>>>>>> + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else",
> >>>>>>> "Toilet brush")));
> >>>>>>> +
> >>>>>>> + List<Pair<Integer, Pair<String, String>>> joinedResultList =
> >>>>>>> Lists.newArrayList(joined
> >>>>>>> + .materialize());
> >>>>>>> + Collections.sort(joinedResultList);
> >>>>>>> +
> >>>>>>> + assertEquals(expectedJoinResult, joinedResultList);
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> + private static PTable<Integer, String> readTable(Pipeline
> pipeline,
> >>>>>>> String filename) {
> >>>>>>> + try {
> >>>>>>> + return
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
> >>>>>>> + new LineSplitter(), Writables.tableOf(Writables.ints(),
> >>>>>>> Writables.strings()));
> >>>>>>> + } catch (IOException e) {
> >>>>>>> + throw new RuntimeException(e);
> >>>>>>> + }
> >>>>>>> + }
> >>>>>>> +
> >>>>>>> +}
> >>>>>>> diff --git
> src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> >>>>>>> src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> >>>>>>> index 74e2ad3..c6a0b46 100644
> >>>>>>> --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> >>>>>>> +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
> >>>>>>> @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type;
> >>>>>>> import org.apache.avro.generic.GenericData;
> >>>>>>> import org.apache.avro.util.Utf8;
> >>>>>>> import org.apache.hadoop.io.LongWritable;
> >>>>>>> +import org.junit.Ignore;
> >>>>>>> import org.junit.Test;
> >>>>>>>
> >>>>>>> import com.cloudera.crunch.Pair;
> >>>>>>> @@ -103,6 +104,7 @@ public class AvrosTest {
> >>>>>>> }
> >>>>>>>
> >>>>>>> @Test
> >>>>>>> + @Ignore("This test creates an invalid schema that causes
> >>>>>>> Schema#toString to fail")
> >>>>>>> public void testNestedTables() throws Exception {
> >>>>>>> PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(),
> >>>>>>
> >>>>>
> >>>>>
> >>>>> Avros.longs());
> >>>>>>> PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll,
> >>>>>>> Avros.strings());
> >>>>>>> diff --git src/test/resources/customers.txt
> >>>>>>
> >>>>>
> >>>>>
> >>>>> src/test/resources/customers.txt
> >>>>>>> new file mode 100644
> >>>>>>> index 0000000..98f3f3d
> >>>>>>> --- /dev/null
> >>>>>>> +++ src/test/resources/customers.txt
> >>>>>>> @@ -0,0 +1,4 @@
> >>>>>>> +111|John Doe
> >>>>>>> +222|Jane Doe
> >>>>>>> +333|Someone Else
> >>>>>>> +444|Has No Orders
> >>>>>>> \ No newline at end of file
> >>>>>>> diff --git src/test/resources/orders.txt
> src/test/resources/orders.txt
> >>>>>>> new file mode 100644
> >>>>>>> index 0000000..2f1383f
> >>>>>>> --- /dev/null
> >>>>>>> +++ src/test/resources/orders.txt
> >>>>>>> @@ -0,0 +1,4 @@
> >>>>>>> +222|Toilet plunger
> >>>>>>> +333|Toilet brush
> >>>>>>> +222|Toilet paper
> >>>>>>> +111|Corn flakes
> >>>>>>> \ No newline at end of file
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov
> >>>>>>> <christian.tzolov@gmail.com (mailto:christian.tzolov@gmail.com)>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> wrote:
> >>>>>>>> Hi Gabriel,
> >>>>>>>>
> >>>>>>>> Seems like the attachment is missing.
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Chris
> >>>>>>>>
> >>>>>>>> On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid <
> gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)(mailto:
> >>>>> gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com))> wrote:
> >>>>>>>>
> >>>>>>>>> Hi guys,
> >>>>>>>>>
> >>>>>>>>> Attached (hopefully) is a patch for an initial implementation of
> >>>>> map
> >>>>>>>>> side joins. It's currently implemented as a static method in a
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> class
> >>>>>>>>> called MapsideJoin, with the same interface as the existing Join
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> class
> >>>>>>>>> (with only inner joins being implemented for now). The way it
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> works is
> >>>>>>>>> that the right-side PTable of the join is put in the distributed
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> cache
> >>>>>>>>> and then read by the join function at runtime.
> >>>>>>>>>
> >>>>>>>>> There's one spot that I can see for a potentially interesting
> >>>>>>>>> optimization -- MRPipeline#run is called once for each map side
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> join
> >>>>>>>>> that is set up, but if the setup of the joins was done within
> >>>>>>>>> MRPipeline, then we could set up multiple map side joins in
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> parallel
> >>>>>>>>> with a single call to MRPipeline#run. OTOH, a whole bunch of map
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> side
> >>>>>>>>> joins in parallel probably isn't that common of an operation.
> >>>>>>>>>
> >>>>>>>>> If anyone feels like taking a look at the patch, any feedback is
> >>>>>>>>> appreciated. If nobody sees something that needs serious changes
> in
> >>>>>>>>> the patch, I'll commit it.
> >>>>>>>>>
> >>>>>>>>> - Gabriel
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid <
> >>>>> gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)>
> >>>>>>>>> wrote:
> >>>>>>>>>> Replying to all...
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills <
> jwills@cloudera.com (mailto:jwills@cloudera.com)(mailto:
> >>>>> jwills@cloudera.com (mailto:jwills@cloudera.com))> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> So there's a philosophical issue here: should Crunch ever make
> >>>>>>>>>>> decisions about how to do something itself based on its
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> estimates of
> >>>>>>>>>>> the size of the data sets, or should it always do exactly what
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> the
> >>>>>>>>>>> developer indicates?
> >>>>>>>>>>>
> >>>>>>>>>>> I can make a case either way, but I think that no matter what,
> >>>>> we
> >>>>>>>>>>> would want to have explicit functions for performing a join
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> that reads
> >>>>>>>>>>> one data set into memory, so I think we can proceed w/the
> >>>>>>>>>>> implementation while folks weigh in on what their preferences
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> are for
> >>>>>>>>>>> the default join() behavior (e.g., just do a reduce-side join,
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> or try
> >>>>>>>>>>> to figure out the best join given information about the input
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> data and
> >>>>>>>>>>> some configuration parameters.)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I definitely agree on needing to have an explicit way to invoke
> >>>>> one or
> >>>>>>>>>> the other -- and in general I don't like having magic behind the
> >>>>>>>>>> scenes to decide on behaviour (especially considering Crunch is
> >>>>>>>>>> generally intended to be closer to the metal than Pig and Hive).
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> I'm
> >>>>>>>>>> not sure if the runtime decision is something specific to some
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> of my
> >>>>>>>>>> use cases or if it could be useful to a wider audience.
> >>>>>>>>>>
> >>>>>>>>>> The ability to dynamically decide at runtime whether a map side
> >>>>> join
> >>>>>>>>>> should be used can also easily be tacked on outside of Crunch,
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> and
> >>>>>>>>>> won't impact the underlying implementation (as you pointed out),
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> so I
> >>>>>>>>>> definitely also agree on focusing on the underlying
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> implementation
> >>>>>>>>>> first, and we can worry about the options used for exposing it
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> later
> >>>>>>>>>> on.
> >>>>>>>>>>
> >>>>>>>>>> - Gabriel
> >>>>
> >>>>
> >>>> --
> >>>> Director of Data Science
> >>>> Cloudera <http://www.cloudera.com>
> >>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
> >>>
> >>>
> >>>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message