incubator-crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: map side ("replicated") joins in Crunch
Date Fri, 06 Jul 2012 16:50:42 GMT
Hi Josh,

I've just committed the map side joins, after doing some more
extensive testing on it today. Unfortunately, I forgot to squash the
multiple commits into a single one (as I was planning on doing), but
it's all in there and working.

- Gabriel

On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid <gabriel.reid@gmail.com> wrote:
> Thanks for making the JIRA issue.
>
> I was going to do some more testing an a "real" cluster later on today, and as long as that all checks out then this will be ready to go in as far as I'm concerned.
>
> I don't see any reason to hold back on the big renaming for this patch -- if you're ready to do the package renaming, please go ahead.
>
> As long as my testing later today checks out and there aren't any glaring issues from anyone who has tried the patch out, I'll probably be checking the patch in later today (so it'll probably be in before you do the renaming anyhow).
>
> - Gabriel
>
>
> On Friday 6 July 2012 at 00:11, Josh Wills wrote:
>
>> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this.
>>
>> Gabriel, is your feeling that this is ready to go in? I'm debating whether
>> or not to check this in before/after I do the massive com.cloudera ->
>> org.apache renaming. What do you think?
>>
>> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)> wrote:
>>
>> > Hi Joe,
>> >
>> > Looking forward to hearing your feedback! This is still just a patch and
>> > not committed yet, and there's still definitely room for help (i.e. ideas
>> > on how to improve it, or do it totally differently), so certainly let me
>> > know if you've got some ideas.
>> >
>> > - Gabriel
>> >
>> >
>> > On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote:
>> >
>> > > Awesome! Will give this a try soon... wish I could have helped with this
>> > one...
>> > >
>> > > On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)(mailto:
>> > gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com))> wrote:
>> > > > Thanks for pointing that out Chris. I'm guessing the mailing list is
>> > > > stripping out attachments(?). Once the JIRA is up and running then
>> > > > that will be taken care of I guess.
>> > > >
>> > > > The mime type on the last attempt was application/octet-stream, so
>> > > > I've renamed this to a .txt file to try to ensure that it'll get a
>> > > > text/plain mime type (although I don't know if that'll make a
>> > > > difference). I've also pasted it inline below, hopefully one of those
>> > > > solutions works.
>> > > >
>> > > > - Gabriel
>> > > >
>> > > >
>> > > > diff --git src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>> > > > src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>> > > > index 420e8dc..c8ba596 100644
>> > > > --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>> > > > +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java
>> > > > @@ -59,9 +59,9 @@ import com.google.common.collect.Sets;
>> > > > public class MRPipeline implements Pipeline {
>> > > >
>> > > > private static final Log LOG = LogFactory.getLog(MRPipeline.class);
>> > > > -
>> > > > +
>> > > > private static final Random RANDOM = new Random();
>> > > > -
>> > > > +
>> > > > private final Class<?> jarClass;
>> > > > private final String name;
>> > > > private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
>> > > > @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline {
>> > > > public MRPipeline(Class<?> jarClass) throws IOException {
>> > > > this(jarClass, new Configuration());
>> > > > }
>> > > > -
>> > > > - public MRPipeline(Class<?> jarClass, String name){
>> > > > +
>> > > > + public MRPipeline(Class<?> jarClass, String name) {
>> > > > this(jarClass, name, new Configuration());
>> > > > }
>> > > > -
>> > > > +
>> > > > public MRPipeline(Class<?> jarClass, Configuration conf) {
>> > > > - this(jarClass, jarClass.getName(), conf);
>> > > > + this(jarClass, jarClass.getName(), conf);
>> > > > }
>> > > > -
>> > > > +
>> > > > public MRPipeline(Class<?> jarClass, String name, Configuration conf) {
>> > > > this.jarClass = jarClass;
>> > > > this.name (http://this.name) = name;
>> > > > @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline {
>> > > >
>> > > > @Override
>> > > > public void setConfiguration(Configuration conf) {
>> > > > - this.conf = conf;
>> > > > + this.conf = conf;
>> > > > }
>> > > > -
>> > > > +
>> > > > @Override
>> > > > public PipelineResult run() {
>> > > > MSCRPlanner planner = new MSCRPlanner(this, outputTargets);
>> > > > @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline {
>> > > > boolean materialized = false;
>> > > > for (Target t : outputTargets.get(c)) {
>> > > > if (!materialized && t instanceof Source) {
>> > > > - c.materializeAt((SourceTarget) t);
>> > > > - materialized = true;
>> > > > + c.materializeAt((SourceTarget) t);
>> > > > + materialized = true;
>> > > > }
>> > > > }
>> > > > }
>> > > > @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline {
>> > > > cleanup();
>> > > > return res;
>> > > > }
>> > > > -
>> > > > +
>> > > > public <S> PCollection<S> read(Source<S> source) {
>> > > > return new InputCollection<S>(source, this);
>> > > > }
>> > > > @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline {
>> > > > @SuppressWarnings("unchecked")
>> > > > public void write(PCollection<?> pcollection, Target target) {
>> > > > if (pcollection instanceof PGroupedTableImpl) {
>> > > > - pcollection = ((PGroupedTableImpl<?,?>) pcollection).ungroup();
>> > > > + pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
>> > > > } else if (pcollection instanceof UnionCollection || pcollection
>> > > > instanceof UnionTable) {
>> > > > - pcollection = pcollection.parallelDo("UnionCollectionWrapper",
>> > > > - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
>> > > > + pcollection = pcollection.parallelDo("UnionCollectionWrapper",
>> > > > + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
>> > > > }
>> > > > addOutput((PCollectionImpl<?>) pcollection, target);
>> > > > }
>> > > >
>> > > > private void addOutput(PCollectionImpl<?> impl, Target target) {
>> > > > if (!outputTargets.containsKey(impl)) {
>> > > > - outputTargets.put(impl, Sets.<Target>newHashSet());
>> > > > + outputTargets.put(impl, Sets.<Target> newHashSet());
>> > > > }
>> > > > outputTargets.get(impl).add(target);
>> > > > }
>> > > > -
>> > > > +
>> > > > @Override
>> > > > public <T> Iterable<T> materialize(PCollection<T> pcollection) {
>> > > > -
>> > > > - if (pcollection instanceof UnionCollection) {
>> > > > - pcollection = pcollection.parallelDo("UnionCollectionWrapper",
>> > > > - (MapFn)IdentityFn.<Object>getInstance(), pcollection.getPType());
>> > > > - }
>> > > > - PCollectionImpl<T> impl = (PCollectionImpl<T>) pcollection;
>> > > > +
>> > > > + PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pcollection);
>> > > > + ReadableSourceTarget<T> srcTarget =
>> > > > getMaterializeSourceTarget(pcollectionImpl);
>> > > > +
>> > > > + MaterializableIterable<T> c = new MaterializableIterable<T>(this,
>> > > > srcTarget);
>> > > > + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
>> > > > + outputTargetsToMaterialize.put(pcollectionImpl, c);
>> > > > + }
>> > > > + return c;
>> > > > + }
>> > > > +
>> > > > + /**
>> > > > + * Retrieve a ReadableSourceTarget that provides access to the
>> > >
>> >
>> >
>> > contents of a
>> > > > + * {@link PCollection}. This is primarily intended as a helper method
>> > >
>> >
>> >
>> > to
>> > > > + * {@link #materialize(PCollection)}. The underlying data of the
>> > > > + * ReadableSourceTarget may not be actually present until the
>> > > > pipeline is run.
>> > > > + *
>> > > > + * @param pcollection
>> > > > + * The collection for which the ReadableSourceTarget is to be
>> > > > + * retrieved
>> > > > + * @return The ReadableSourceTarget
>> > > > + * @throws IllegalArgumentException
>> > > > + * If no ReadableSourceTarget can be retrieved for the given
>> > > > + * PCollection
>> > > > + */
>> > > > + public <T> ReadableSourceTarget<T>
>> > > > getMaterializeSourceTarget(PCollection<T> pcollection) {
>> > > > + PCollectionImpl<T> impl = toPcollectionImpl(pcollection);
>> > > > SourceTarget<T> matTarget = impl.getMaterializedAt();
>> > > > if (matTarget != null && matTarget instanceof ReadableSourceTarget) {
>> > > > - return new MaterializableIterable<T>(this,
>> > > > (ReadableSourceTarget<T>) matTarget);
>> > > > + return (ReadableSourceTarget<T>) matTarget;
>> > > > + }
>> > > > +
>> > > > + ReadableSourceTarget<T> srcTarget = null;
>> > > > + if (outputTargets.containsKey(pcollection)) {
>> > > > + for (Target target : outputTargets.get(impl)) {
>> > > > + if (target instanceof ReadableSourceTarget) {
>> > > > + srcTarget = (ReadableSourceTarget<T>) target;
>> > > > + break;
>> > > > + }
>> > > > + }
>> > > > }
>> > > > -
>> > > > - ReadableSourceTarget<T> srcTarget = null;
>> > > > - if (outputTargets.containsKey(pcollection)) {
>> > > > - for (Target target : outputTargets.get(impl)) {
>> > > > - if (target instanceof ReadableSourceTarget) {
>> > > > - srcTarget = (ReadableSourceTarget) target;
>> > > > - break;
>> > > > - }
>> > > > - }
>> > > > - }
>> > > > -
>> > > > - if (srcTarget == null) {
>> > > > - SourceTarget<T> st =
>> > >
>> >
>> >
>> > createIntermediateOutput(pcollection.getPType());
>> > > > - if (!(st instanceof ReadableSourceTarget)) {
>> > > > - throw new IllegalArgumentException("The PType for the given
>> > > > PCollection is not readable"
>> > > > - + " and cannot be materialized");
>> > > > - } else {
>> > > > - srcTarget = (ReadableSourceTarget) st;
>> > > > - addOutput(impl, srcTarget);
>> > > > - }
>> > > > - }
>> > > > -
>> > > > - MaterializableIterable<T> c = new MaterializableIterable<T>(this,
>> > >
>> >
>> >
>> > srcTarget);
>> > > > - outputTargetsToMaterialize.put(impl, c);
>> > > > - return c;
>> > > > +
>> > > > + if (srcTarget == null) {
>> > > > + SourceTarget<T> st =
>> > >
>> >
>> >
>> > createIntermediateOutput(pcollection.getPType());
>> > > > + if (!(st instanceof ReadableSourceTarget)) {
>> > > > + throw new IllegalArgumentException("The PType for the given
>> > > > PCollection is not readable"
>> > > > + + " and cannot be materialized");
>> > > > + } else {
>> > > > + srcTarget = (ReadableSourceTarget<T>) st;
>> > > > + addOutput(impl, srcTarget);
>> > > > + }
>> > > > + }
>> > > > +
>> > > > + return srcTarget;
>> > > > + }
>> > > > +
>> > > > + /**
>> > > > + * Safely cast a PCollection into a PCollectionImpl, including
>> > > > handling the case of UnionCollections.
>> > > > + * @param pcollection The PCollection to be cast/transformed
>> > > > + * @return The PCollectionImpl representation
>> > > > + */
>> > > > + private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T>
>> > > > pcollection) {
>> > > > + PCollectionImpl<T> pcollectionImpl = null;
>> > > > + if (pcollection instanceof UnionCollection) {
>> > > > + pcollectionImpl = (PCollectionImpl<T>)
>> > > > pcollection.parallelDo("UnionCollectionWrapper",
>> > > > + (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
>> > > > + } else {
>> > > > + pcollectionImpl = (PCollectionImpl<T>) pcollection;
>> > > > + }
>> > > > + return pcollectionImpl;
>> > > > }
>> > > >
>> > > > public <T> SourceTarget<T> createIntermediateOutput(PType<T> ptype) {
>> > > > - return ptype.getDefaultFileSource(createTempPath());
>> > > > + return ptype.getDefaultFileSource(createTempPath());
>> > > > }
>> > > >
>> > > > public Path createTempPath() {
>> > > > tempFileIndex++;
>> > > > return new Path(tempDirectory, "p" + tempFileIndex);
>> > > > }
>> > > > -
>> > > > +
>> > > > private static Path createTempDirectory(Configuration conf) {
>> > > > Path dir = new Path("/tmp/crunch" + RANDOM.nextInt());
>> > > > - try {
>> > > > - FileSystem.get(conf).mkdirs(dir);
>> > > > - } catch (IOException e) {
>> > > > - LOG.error("Exception creating job output directory", e);
>> > > > - throw new RuntimeException(e);
>> > > > - }
>> > > > + try {
>> > > > + FileSystem.get(conf).mkdirs(dir);
>> > > > + } catch (IOException e) {
>> > > > + LOG.error("Exception creating job output directory", e);
>> > > > + throw new RuntimeException(e);
>> > > > + }
>> > > > return dir;
>> > > > }
>> > > > -
>> > > > +
>> > > > @Override
>> > > > public <T> void writeTextFile(PCollection<T> pcollection, String
>> > >
>> >
>> >
>> > pathName) {
>> > > > // Ensure that this is a writable pcollection instance.
>> > > > - pcollection = pcollection.parallelDo("asText",
>> > >
>> >
>> >
>> > IdentityFn.<T>getInstance(),
>> > > > - WritableTypeFamily.getInstance().as(pcollection.getPType()));
>> > > > + pcollection = pcollection.parallelDo("asText", IdentityFn.<T>
>> > > > getInstance(), WritableTypeFamily
>> > > > + .getInstance().as(pcollection.getPType()));
>> > > > write(pcollection, At.textFile(pathName));
>> > > > }
>> > > >
>> > > > @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline {
>> > > > LOG.info (http://LOG.info)("Exception during cleanup", e);
>> > > > }
>> > > > }
>> > > > -
>> > > > +
>> > > > public int getNextAnonymousStageId() {
>> > > > return nextAnonymousStageId++;
>> > > > }
>> > > > @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline {
>> > > > public void enableDebug() {
>> > > > // Turn on Crunch runtime error catching.
>> > > > getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
>> > > > -
>> > > > +
>> > > > // Write Hadoop's WARN logs to the console.
>> > > > Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch");
>> > > > Appender console = crunchInfoLogger.getAppender("A");
>> > > > @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline {
>> > > > LOG.warn("Could not find console appender named 'A' for writing
>> > > > Hadoop warning logs");
>> > > > }
>> > > > }
>> > > > -
>> > > > +
>> > > > @Override
>> > > > public String getName() {
>> > > > - return name;
>> > > > + return name;
>> > > > }
>> > > > }
>> > > > diff --git
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>> > > > index d41a52e..68ef054 100644
>> > > > ---
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>> > > > +++
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java
>> > > > @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends
>> > > > RuntimeException {
>> > > > super(e);
>> > > > }
>> > > >
>> > > > + public CrunchRuntimeException(String msg, Exception e) {
>> > > > + super(msg, e);
>> > > > + }
>> > > > +
>> > > > public boolean wasLogged() {
>> > > > return logged;
>> > > > }
>> > > > diff --git
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>> > > > src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>> > > > index 1122d62..4debfeb 100644
>> > > > --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>> > > > +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java
>> > > > @@ -45,7 +45,8 @@ public class AvroFileSource<T> extends
>> > > > FileSourceImpl<T> implements ReadableSour
>> > > >
>> > > > @Override
>> > > > public Iterable<T> read(Configuration conf) throws IOException {
>> > > > - return CompositePathIterable.create(FileSystem.get(conf), path,
>> > > > new AvroFileReaderFactory<T>(
>> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf);
>> > > > + return CompositePathIterable.create(fs, path, new
>> > >
>> >
>> >
>> > AvroFileReaderFactory<T>(
>> > > > (AvroType<T>) ptype, conf));
>> > > > }
>> > > > }
>> > > > diff --git src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>> > > > index 24dec2d..462ef93 100644
>> > > > --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>> > > > +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java
>> > > > @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource;
>> > > > import com.cloudera.crunch.io.impl.FileSourceImpl;
>> > > > import com.cloudera.crunch.types.PType;
>> > > >
>> > > > -public class SeqFileSource<T> extends FileSourceImpl<T> implements
>> > > > - ReadableSource<T> {
>> > > > +public class SeqFileSource<T> extends FileSourceImpl<T> implements
>> > > > ReadableSource<T> {
>> > > >
>> > > > public SeqFileSource(Path path, PType<T> ptype) {
>> > > > - super(path, ptype, SequenceFileInputFormat.class);
>> > > > + super(path, ptype, SequenceFileInputFormat.class);
>> > > > }
>> > > > -
>> > > > +
>> > > > @Override
>> > > > public Iterable<T> read(Configuration conf) throws IOException {
>> > > > - FileSystem fs = FileSystem.get(conf);
>> > > > - return CompositePathIterable.create(fs, path,
>> > > > - new SeqFileReaderFactory<T>(ptype, conf));
>> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf);
>> > > > + return CompositePathIterable.create(fs, path, new
>> > > > SeqFileReaderFactory<T>(ptype, conf));
>> > > > }
>> > > >
>> > > > @Override
>> > > > diff --git
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>> > > > index 69ca12b..4db6658 100644
>> > > > --- src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>> > > > +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java
>> > > > @@ -42,7 +42,7 @@ public class SeqFileTableSource<K, V> extends
>> > > > FileTableSourceImpl<K, V> implemen
>> > > >
>> > > > @Override
>> > > > public Iterable<Pair<K, V>> read(Configuration conf) throws
>> > >
>> >
>> >
>> > IOException {
>> > > > - FileSystem fs = FileSystem.get(conf);
>> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf);
>> > > > return CompositePathIterable.create(fs, path,
>> > > > new SeqFileTableReaderFactory<K, V>((PTableType<K, V>) ptype, conf));
>> > > > }
>> > > > diff --git
>> > >
>> >
>> >
>> > src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>> > > > src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>> > > > index a876843..e0dbe68 100644
>> > > > --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>> > > > +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
>> > > > @@ -67,7 +67,7 @@ public class TextFileSource<T> extends
>> > > > FileSourceImpl<T> implements
>> > > >
>> > > > @Override
>> > > > public Iterable<T> read(Configuration conf) throws IOException {
>> > > > - return CompositePathIterable.create(FileSystem.get(conf), path,
>> > > > - new TextFileReaderFactory<T>(ptype, conf));
>> > > > + return CompositePathIterable.create(FileSystem.get(path.toUri(),
>> > > > conf), path,
>> > > > + new TextFileReaderFactory<T>(ptype, conf));
>> > > > }
>> > > > }
>> > > > diff --git src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
>> > > > src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
>> > > > new file mode 100644
>> > > > index 0000000..8072e07
>> > > > --- /dev/null
>> > > > +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java
>> > > > @@ -0,0 +1,143 @@
>> > > > +package com.cloudera.crunch.lib.join;
>> > > > +
>> > > > +import java.io.IOException;
>> > > > +
>> > > > +import org.apache.hadoop.filecache.DistributedCache;
>> > > > +import org.apache.hadoop.fs.FileSystem;
>> > > > +import org.apache.hadoop.fs.Path;
>> > > > +
>> > > > +import com.cloudera.crunch.DoFn;
>> > > > +import com.cloudera.crunch.Emitter;
>> > > > +import com.cloudera.crunch.PTable;
>> > > > +import com.cloudera.crunch.Pair;
>> > > > +import com.cloudera.crunch.impl.mr.MRPipeline;
>> > > > +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
>> > > > +import com.cloudera.crunch.io.ReadableSourceTarget;
>> > > > +import com.cloudera.crunch.io.impl.SourcePathTargetImpl;
>> > > > +import com.cloudera.crunch.types.PType;
>> > > > +import com.cloudera.crunch.types.PTypeFamily;
>> > > > +import com.google.common.collect.ArrayListMultimap;
>> > > > +import com.google.common.collect.Multimap;
>> > > > +
>> > > > +/**
>> > > > + * Utility for doing map side joins on a common key between two
>> > > > {@link PTable}s.
>> > > > + * <p>
>> > > > + * A map side join is an optimized join which doesn't use a reducer;
>> > >
>> >
>> >
>> > instead,
>> > > > + * the right side of the join is loaded into memory and the join is
>> > > > performed in
>> > > > + * a mapper. This style of join has the important implication that
>> > > > the output of
>> > > > + * the join is not sorted, which is the case with a conventional
>> > > > (reducer-based)
>> > > > + * join.
>> > > > + * <p>
>> > > > + * <b>Note:</b>This utility is only supported when running with a
>> > > > + * {@link MRPipeline} as the pipeline.
>> > > > + */
>> > > > +public class MapsideJoin {
>> > > > +
>> > > > + /**
>> > > > + * Join two tables using a map side join. The right-side table will
>> > >
>> >
>> >
>> > be loaded
>> > > > + * fully in memory, so this method should only be used if the right
>> > >
>> >
>> >
>> > side
>> > > > + * table's contents can fit in the memory allocated to mappers. The
>> > >
>> >
>> >
>> > join
>> > > > + * performed by this method is an inner join.
>> > > > + *
>> > > > + * @param left
>> > > > + * The left-side table of the join
>> > > > + * @param right
>> > > > + * The right-side table of the join, whose contents will be fully
>> > > > + * read into memory
>> > > > + * @return A table keyed on the join key, containing pairs of joined
>> > >
>> >
>> >
>> > values
>> > > > + */
>> > > > + public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U>
>> > > > left, PTable<K, V> right) {
>> > > > +
>> > > > + if (!(right.getPipeline() instanceof MRPipeline)) {
>> > > > + throw new CrunchRuntimeException("Map-side join is only
>> > > > supported within a MapReduce context");
>> > > > + }
>> > > > +
>> > > > + MRPipeline pipeline = (MRPipeline) right.getPipeline();
>> > > > + pipeline.materialize(right);
>> > > > +
>> > > > + // TODO Move necessary logic to MRPipeline so that we can
>> > >
>> >
>> >
>> > theoretically
>> > > > + // optimize his by running the setup of multiple map-side joins
>> > > > concurrently
>> > > > + pipeline.run();
>> > > > +
>> > > > + ReadableSourceTarget<Pair<K, V>> readableSourceTarget = pipeline
>> > > > + .getMaterializeSourceTarget(right);
>> > > > + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) {
>> > > > + throw new CrunchRuntimeException("Right-side contents can't be
>> > > > read from a path");
>> > > > + }
>> > > > +
>> > > > + // Suppress warnings because we've just checked this cast via
>> > >
>> >
>> >
>> > instanceof
>> > > > + @SuppressWarnings("unchecked")
>> > > > + SourcePathTargetImpl<Pair<K, V>> sourcePathTarget =
>> > > > (SourcePathTargetImpl<Pair<K, V>>) readableSourceTarget;
>> > > > +
>> > > > + Path path = sourcePathTarget.getPath();
>> > > > + DistributedCache.addCacheFile(path.toUri(),
>> > >
>> >
>> >
>> > pipeline.getConfiguration());
>> > > > +
>> > > > + MapsideJoinDoFn<K, U, V> mapJoinDoFn = new MapsideJoinDoFn<K, U,
>> > > > V>(path.toString(),
>> > > > + right.getPType());
>> > > > + PTypeFamily typeFamily = left.getTypeFamily();
>> > > > + return left.parallelDo(
>> > > > + "mapjoin",
>> > > > + mapJoinDoFn,
>> > > > + typeFamily.tableOf(left.getKeyType(),
>> > > > + typeFamily.pairs(left.getValueType(), right.getValueType())));
>> > > > +
>> > > > + }
>> > > > +
>> > > > + static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>,
>> > > > Pair<K, Pair<U, V>>> {
>> > > > +
>> > > > + private String inputPath;
>> > > > + private PType<Pair<K, V>> ptype;
>> > > > + private Multimap<K, V> joinMap;
>> > > > +
>> > > > + public MapsideJoinDoFn(String inputPath, PType<Pair<K, V>> ptype) {
>> > > > + this.inputPath = inputPath;
>> > > > + this.ptype = ptype;
>> > > > + }
>> > > > +
>> > > > + private Path getCacheFilePath() {
>> > > > + try {
>> > > > + for (Path localPath :
>> > > > DistributedCache.getLocalCacheFiles(getConfiguration())) {
>> > > > + if (localPath.toString().endsWith(inputPath)) {
>> > > > + return
>> > > > localPath.makeQualified(FileSystem.getLocal(getConfiguration()));
>> > > > +
>> > > > + }
>> > > > + }
>> > > > + } catch (IOException e) {
>> > > > + throw new CrunchRuntimeException(e);
>> > > > + }
>> > > > +
>> > > > + throw new CrunchRuntimeException("Can't find local cache file
>> > > > for '" + inputPath + "'");
>> > > > + }
>> > > > +
>> > > > + @Override
>> > > > + public void initialize() {
>> > > > + super.initialize();
>> > > > +
>> > > > + ReadableSourceTarget<Pair<K, V>> sourceTarget =
>> > > > (ReadableSourceTarget<Pair<K, V>>) ptype
>> > > > + .getDefaultFileSource(getCacheFilePath());
>> > > > + Iterable<Pair<K, V>> iterable = null;
>> > > > + try {
>> > > > + iterable = sourceTarget.read(getConfiguration());
>> > > > + } catch (IOException e) {
>> > > > + throw new CrunchRuntimeException("Error reading right-side of
>> > > > map side join: ", e);
>> > > > + }
>> > > > +
>> > > > + joinMap = ArrayListMultimap.create();
>> > > > + for (Pair<K, V> joinPair : iterable) {
>> > > > + joinMap.put(joinPair.first(), joinPair.second());
>> > > > + }
>> > > > + }
>> > > > +
>> > > > + @Override
>> > > > + public void process(Pair<K, U> input, Emitter<Pair<K, Pair<U,
>> > > > V>>> emitter) {
>> > > > + K key = input.first();
>> > > > + U value = input.second();
>> > > > + for (V joinValue : joinMap.get(key)) {
>> > > > + Pair<U, V> valuePair = Pair.of(value, joinValue);
>> > > > + emitter.emit(Pair.of(key, valuePair));
>> > > > + }
>> > > > + }
>> > > > +
>> > > > + }
>> > > > +
>> > > > +}
>> > > > diff --git src/main/java/com/cloudera/crunch/types/PType.java
>> > > > src/main/java/com/cloudera/crunch/types/PType.java
>> > > > index af4ef1b..ae480aa 100644
>> > > > --- src/main/java/com/cloudera/crunch/types/PType.java
>> > > > +++ src/main/java/com/cloudera/crunch/types/PType.java
>> > > > @@ -15,6 +15,7 @@
>> > > >
>> > > > package com.cloudera.crunch.types;
>> > > >
>> > > > +import java.io.Serializable;
>> > > > import java.util.List;
>> > > >
>> > > > import org.apache.hadoop.fs.Path;
>> > > > @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget;
>> > > > * {@code PCollection}.
>> > > > *
>> > > > */
>> > > > -public interface PType<T> {
>> > > > +public interface PType<T> extends Serializable {
>> > > > /**
>> > > > * Returns the Java type represented by this {@code PType}.
>> > > > */
>> > > > diff --git src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>> > > > src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>> > > > index 3db00c0..29af9fb 100644
>> > > > --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>> > > > +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java
>> > > > @@ -14,6 +14,7 @@
>> > > > */
>> > > > package com.cloudera.crunch.types.avro;
>> > > >
>> > > > +import java.io.Serializable;
>> > > > import java.util.List;
>> > > >
>> > > > import org.apache.avro.Schema;
>> > > > @@ -41,7 +42,8 @@ public class AvroType<T> implements PType<T> {
>> > > > private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
>> > > >
>> > > > private final Class<T> typeClass;
>> > > > - private final Schema schema;
>> > > > + private final String schemaString;
>> > > > + private transient Schema schema;
>> > > > private final MapFn baseInputMapFn;
>> > > > private final MapFn baseOutputMapFn;
>> > > > private final List<PType> subTypes;
>> > > > @@ -55,6 +57,7 @@ public class AvroType<T> implements PType<T> {
>> > > > MapFn outputMapFn, PType... ptypes) {
>> > > > this.typeClass = typeClass;
>> > > > this.schema = Preconditions.checkNotNull(schema);
>> > > > + this.schemaString = schema.toString();
>> > > > this.baseInputMapFn = inputMapFn;
>> > > > this.baseOutputMapFn = outputMapFn;
>> > > > this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
>> > > > @@ -76,6 +79,9 @@ public class AvroType<T> implements PType<T> {
>> > > > }
>> > > >
>> > > > public Schema getSchema() {
>> > > > + if (schema == null){
>> > > > + schema = new Schema.Parser().parse(schemaString);
>> > > > + }
>> > > > return schema;
>> > > > }
>> > > >
>> > > > diff --git
>> > src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
>> > > > src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
>> > > > new file mode 100644
>> > > > index 0000000..f265460
>> > > > --- /dev/null
>> > > > +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java
>> > > > @@ -0,0 +1,60 @@
>> > > > +package com.cloudera.crunch.impl.mr;
>> > > > +
>> > > > +import static org.junit.Assert.assertEquals;
>> > > > +import static org.mockito.Mockito.doReturn;
>> > > > +import static org.mockito.Mockito.mock;
>> > > > +import static org.mockito.Mockito.spy;
>> > > > +import static org.mockito.Mockito.when;
>> > > > +
>> > > > +import java.io.IOException;
>> > > > +
>> > > > +import org.junit.Before;
>> > > > +import org.junit.Test;
>> > > > +
>> > > > +import com.cloudera.crunch.SourceTarget;
>> > > > +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl;
>> > > > +import com.cloudera.crunch.io.ReadableSourceTarget;
>> > > > +import com.cloudera.crunch.types.avro.Avros;
>> > > > +
>> > > > +public class MRPipelineTest {
>> > > > +
>> > > > + private MRPipeline pipeline;
>> > > > +
>> > > > + @Before
>> > > > + public void setUp() throws IOException {
>> > > > + pipeline = spy(new MRPipeline(MRPipelineTest.class));
>> > > > + }
>> > > > +
>> > > > + @Test
>> > > > + public void testGetMaterializeSourceTarget_AlreadyMaterialized() {
>> > > > + PCollectionImpl<String> materializedPcollection =
>> > > > mock(PCollectionImpl.class);
>> > > > + ReadableSourceTarget<String> readableSourceTarget =
>> > > > mock(ReadableSourceTarget.class);
>> > > > +
>> > >
>> >
>> >
>> > when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget);
>> > > > +
>> > > > + assertEquals(readableSourceTarget,
>> > > > pipeline.getMaterializeSourceTarget(materializedPcollection));
>> > > > + }
>> > > > +
>> > > > + @Test
>> > > > + public void
>> > >
>> >
>> >
>> > testGetMaterializeSourceTarget_NotMaterialized_HasOutput() {
>> > > > +
>> > > > + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
>> > > > + ReadableSourceTarget<String> readableSourceTarget =
>> > > > mock(ReadableSourceTarget.class);
>> > > > + when(pcollection.getPType()).thenReturn(Avros.strings());
>> > > > +
>> > >
>> >
>> >
>> > doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
>> > > > + when(pcollection.getMaterializedAt()).thenReturn(null);
>> > > > +
>> > > > + assertEquals(readableSourceTarget,
>> > > > pipeline.getMaterializeSourceTarget(pcollection));
>> > > > + }
>> > > > +
>> > > > + @Test(expected = IllegalArgumentException.class)
>> > > > + public void
>> > >
>> >
>> >
>> > testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget()
>> > > > {
>> > > > + PCollectionImpl<String> pcollection = mock(PCollectionImpl.class);
>> > > > + SourceTarget<String> nonReadableSourceTarget =
>> > >
>> >
>> >
>> > mock(SourceTarget.class);
>> > > > + when(pcollection.getPType()).thenReturn(Avros.strings());
>> > > > +
>> > >
>> >
>> >
>> > doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings());
>> > > > + when(pcollection.getMaterializedAt()).thenReturn(null);
>> > > > +
>> > > > + pipeline.getMaterializeSourceTarget(pcollection);
>> > > > + }
>> > > > +
>> > > > +}
>> > > > diff --git
>> > >
>> >
>> >
>> > src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
>> > > > src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
>> > > > new file mode 100644
>> > > > index 0000000..97e0c63
>> > > > --- /dev/null
>> > > > +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java
>> > > > @@ -0,0 +1,102 @@
>> > > > +package com.cloudera.crunch.lib.join;
>> > > > +
>> > > > +import static org.junit.Assert.assertEquals;
>> > > > +import static org.junit.Assert.assertTrue;
>> > > > +
>> > > > +import java.io.IOException;
>> > > > +import java.util.Collections;
>> > > > +import java.util.List;
>> > > > +
>> > > > +import org.junit.Test;
>> > > > +
>> > > > +import com.cloudera.crunch.FilterFn;
>> > > > +import com.cloudera.crunch.MapFn;
>> > > > +import com.cloudera.crunch.PTable;
>> > > > +import com.cloudera.crunch.Pair;
>> > > > +import com.cloudera.crunch.Pipeline;
>> > > > +import com.cloudera.crunch.impl.mem.MemPipeline;
>> > > > +import com.cloudera.crunch.impl.mr.MRPipeline;
>> > > > +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException;
>> > > > +import com.cloudera.crunch.test.FileHelper;
>> > > > +import com.cloudera.crunch.types.writable.Writables;
>> > > > +import com.google.common.collect.Lists;
>> > > > +
>> > > > +public class MapsideJoinTest {
>> > > > +
>> > > > + private static class LineSplitter extends MapFn<String,
>> > > > Pair<Integer, String>> {
>> > > > +
>> > > > + @Override
>> > > > + public Pair<Integer, String> map(String input) {
>> > > > + String[] fields = input.split("\\|");
>> > > > + return Pair.of(Integer.parseInt(fields[0]), fields[1]);
>> > > > + }
>> > > > +
>> > > > + }
>> > > > +
>> > > > + private static class NegativeFilter extends FilterFn<Pair<Integer,
>> > >
>> >
>> >
>> > String>> {
>> > > > +
>> > > > + @Override
>> > > > + public boolean accept(Pair<Integer, String> input) {
>> > > > + return false;
>> > > > + }
>> > > > +
>> > > > + }
>> > > > +
>> > > > + @Test(expected = CrunchRuntimeException.class)
>> > > > + public void testNonMapReducePipeline() {
>> > > > + runMapsideJoin(MemPipeline.getInstance());
>> > > > + }
>> > > > +
>> > > > + @Test
>> > > > + public void testMapsideJoin_RightSideIsEmpty() throws IOException {
>> > > > + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class);
>> > > > + PTable<Integer, String> customerTable = readTable(pipeline,
>> > > > "customers.txt");
>> > > > + PTable<Integer, String> orderTable = readTable(pipeline,
>> > >
>> >
>> >
>> > "orders.txt");
>> > > > +
>> > > > + PTable<Integer, String> filteredOrderTable =
>> > > > orderTable.parallelDo(new NegativeFilter(),
>> > > > + orderTable.getPTableType());
>> > > > +
>> > > > + PTable<Integer, Pair<String, String>> joined =
>> > > > MapsideJoin.join(customerTable,
>> > > > + filteredOrderTable);
>> > > > +
>> > > > + List<Pair<Integer, Pair<String, String>>> materializedJoin =
>> > > > Lists.newArrayList(joined
>> > > > + .materialize());
>> > > > +
>> > > > + assertTrue(materializedJoin.isEmpty());
>> > > > +
>> > > > + }
>> > > > +
>> > > > + @Test
>> > > > + public void testMapsideJoin() throws IOException {
>> > > > + runMapsideJoin(new MRPipeline(MapsideJoinTest.class));
>> > > > + }
>> > > > +
>> > > > + private void runMapsideJoin(Pipeline pipeline) {
>> > > > + PTable<Integer, String> customerTable = readTable(pipeline,
>> > > > "customers.txt");
>> > > > + PTable<Integer, String> orderTable = readTable(pipeline,
>> > >
>> >
>> >
>> > "orders.txt");
>> > > > +
>> > > > + PTable<Integer, Pair<String, String>> joined =
>> > > > MapsideJoin.join(customerTable, orderTable);
>> > > > +
>> > > > + List<Pair<Integer, Pair<String, String>>> expectedJoinResult =
>> > > > Lists.newArrayList();
>> > > > + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn
>> > >
>> >
>> >
>> > flakes")));
>> > > > + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
>> > >
>> >
>> >
>> > paper")));
>> > > > + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet
>> > > > plunger")));
>> > > > + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else",
>> > > > "Toilet brush")));
>> > > > +
>> > > > + List<Pair<Integer, Pair<String, String>>> joinedResultList =
>> > > > Lists.newArrayList(joined
>> > > > + .materialize());
>> > > > + Collections.sort(joinedResultList);
>> > > > +
>> > > > + assertEquals(expectedJoinResult, joinedResultList);
>> > > > + }
>> > > > +
>> > > > + private static PTable<Integer, String> readTable(Pipeline pipeline,
>> > > > String filename) {
>> > > > + try {
>> > > > + return
>> > >
>> >
>> >
>> > pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable",
>> > > > + new LineSplitter(), Writables.tableOf(Writables.ints(),
>> > > > Writables.strings()));
>> > > > + } catch (IOException e) {
>> > > > + throw new RuntimeException(e);
>> > > > + }
>> > > > + }
>> > > > +
>> > > > +}
>> > > > diff --git src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>> > > > src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>> > > > index 74e2ad3..c6a0b46 100644
>> > > > --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>> > > > +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java
>> > > > @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type;
>> > > > import org.apache.avro.generic.GenericData;
>> > > > import org.apache.avro.util.Utf8;
>> > > > import org.apache.hadoop.io.LongWritable;
>> > > > +import org.junit.Ignore;
>> > > > import org.junit.Test;
>> > > >
>> > > > import com.cloudera.crunch.Pair;
>> > > > @@ -103,6 +104,7 @@ public class AvrosTest {
>> > > > }
>> > > >
>> > > > @Test
>> > > > + @Ignore("This test creates an invalid schema that causes
>> > > > Schema#toString to fail")
>> > > > public void testNestedTables() throws Exception {
>> > > > PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(),
>> > >
>> >
>> >
>> > Avros.longs());
>> > > > PTableType<Pair<Long, Long>, String> nest = Avros.tableOf(pll,
>> > > > Avros.strings());
>> > > > diff --git src/test/resources/customers.txt
>> > >
>> >
>> >
>> > src/test/resources/customers.txt
>> > > > new file mode 100644
>> > > > index 0000000..98f3f3d
>> > > > --- /dev/null
>> > > > +++ src/test/resources/customers.txt
>> > > > @@ -0,0 +1,4 @@
>> > > > +111|John Doe
>> > > > +222|Jane Doe
>> > > > +333|Someone Else
>> > > > +444|Has No Orders
>> > > > \ No newline at end of file
>> > > > diff --git src/test/resources/orders.txt src/test/resources/orders.txt
>> > > > new file mode 100644
>> > > > index 0000000..2f1383f
>> > > > --- /dev/null
>> > > > +++ src/test/resources/orders.txt
>> > > > @@ -0,0 +1,4 @@
>> > > > +222|Toilet plunger
>> > > > +333|Toilet brush
>> > > > +222|Toilet paper
>> > > > +111|Corn flakes
>> > > > \ No newline at end of file
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov
>> > > > <christian.tzolov@gmail.com (mailto:christian.tzolov@gmail.com)>
>> > >
>> >
>> >
>> > wrote:
>> > > > > Hi Gabriel,
>> > > > >
>> > > > > Seems like the attachment is missing.
>> > > > >
>> > > > > Cheers,
>> > > > > Chris
>> > > > >
>> > > > > On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid <gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)(mailto:
>> > gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com))> wrote:
>> > > > >
>> > > > > > Hi guys,
>> > > > > >
>> > > > > > Attached (hopefully) is a patch for an initial implementation of
>> > map
>> > > > > > side joins. It's currently implemented as a static method in a
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > class
>> > > > > > called MapsideJoin, with the same interface as the existing Join
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > class
>> > > > > > (with only inner joins being implemented for now). The way it
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > works is
>> > > > > > that the right-side PTable of the join is put in the distributed
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > cache
>> > > > > > and then read by the join function at runtime.
>> > > > > >
>> > > > > > There's one spot that I can see for a potentially interesting
>> > > > > > optimization -- MRPipeline#run is called once for each map side
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > join
>> > > > > > that is set up, but if the setup of the joins was done within
>> > > > > > MRPipeline, then we could set up multiple map side joins in
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > parallel
>> > > > > > with a single call to MRPipeline#run. OTOH, a whole bunch of map
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > side
>> > > > > > joins in parallel probably isn't that common of an operation.
>> > > > > >
>> > > > > > If anyone feels like taking a look at the patch, any feedback is
>> > > > > > appreciated. If nobody sees something that needs serious changes in
>> > > > > > the patch, I'll commit it.
>> > > > > >
>> > > > > > - Gabriel
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid <
>> > gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)>
>> > > > > > wrote:
>> > > > > > > Replying to all...
>> > > > > > >
>> > > > > > > On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills <jwills@cloudera.com (mailto:jwills@cloudera.com)(mailto:
>> > jwills@cloudera.com (mailto:jwills@cloudera.com))> wrote:
>> > > > > > > >
>> > > > > > > > So there's a philosophical issue here: should Crunch ever make
>> > > > > > > > decisions about how to do something itself based on its
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > estimates of
>> > > > > > > > the size of the data sets, or should it always do exactly what
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > the
>> > > > > > > > developer indicates?
>> > > > > > > >
>> > > > > > > > I can make a case either way, but I think that no matter what,
>> > we
>> > > > > > > > would want to have explicit functions for performing a join
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > that reads
>> > > > > > > > one data set into memory, so I think we can proceed w/the
>> > > > > > > > implementation while folks weigh in on what their preferences
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > are for
>> > > > > > > > the default join() behavior (e.g., just do a reduce-side join,
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > or try
>> > > > > > > > to figure out the best join given information about the input
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > data and
>> > > > > > > > some configuration parameters.)
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > I definitely agree on needing to have an explicit way to invoke
>> > one or
>> > > > > > > the other -- and in general I don't like having magic behind the
>> > > > > > > scenes to decide on behaviour (especially considering Crunch is
>> > > > > > > generally intended to be closer to the metal than Pig and Hive).
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > I'm
>> > > > > > > not sure if the runtime decision is something specific to some
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > of my
>> > > > > > > use cases or if it could be useful to a wider audience.
>> > > > > > >
>> > > > > > > The ability to dynamically decide at runtime whether a map side
>> > join
>> > > > > > > should be used can also easily be tacked on outside of Crunch,
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > and
>> > > > > > > won't impact the underlying implementation (as you pointed out),
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > so I
>> > > > > > > definitely also agree on focusing on the underlying
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > implementation
>> > > > > > > first, and we can worry about the options used for exposing it
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> > later
>> > > > > > > on.
>> > > > > > >
>> > > > > > > - Gabriel
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>

Mime
View raw message