Return-Path: X-Original-To: apmail-incubator-crunch-dev-archive@minotaur.apache.org Delivered-To: apmail-incubator-crunch-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 45554DB54 for ; Fri, 6 Jul 2012 16:51:12 +0000 (UTC) Received: (qmail 35433 invoked by uid 500); 6 Jul 2012 16:51:12 -0000 Delivered-To: apmail-incubator-crunch-dev-archive@incubator.apache.org Received: (qmail 35406 invoked by uid 500); 6 Jul 2012 16:51:12 -0000 Mailing-List: contact crunch-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: crunch-dev@incubator.apache.org Delivered-To: mailing list crunch-dev@incubator.apache.org Received: (qmail 35398 invoked by uid 99); 6 Jul 2012 16:51:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jul 2012 16:51:12 +0000 X-ASF-Spam-Status: No, hits=0.3 required=5.0 tests=FREEMAIL_REPLY,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of gabriel.reid@gmail.com designates 209.85.160.47 as permitted sender) Received: from [209.85.160.47] (HELO mail-pb0-f47.google.com) (209.85.160.47) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jul 2012 16:51:05 +0000 Received: by pbbrq2 with SMTP id rq2so14246323pbb.6 for ; Fri, 06 Jul 2012 09:50:44 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :cc:content-type; bh=CpPxceoSk1nh2YNz3IQhfzdbzJy7zMNBsaX2XMvwxD0=; b=SGOZAvcE1CF2+cdoJczahD/9ZVq3PspseqrQWsaXS6EJx1ilOdkTA+bu2MI0B7qRa9 B40RApoMx7qLMpk4ZH1o26kroaoJsdbHsPSEp5JRG/hMjik3E5adx+6QjleB9oMmwkl1 rKYN7+kFUuJytI1oWjniVdP7zTEgY0rxCaurODACkKfn5I692KC1AbOqQpWp5QYhBCq9 AX8Wy0ByRQlx1wsqXhpZwIOm75pta/GaIOQ+NHZqQdVjsvviA6ypOpAtzptCuJzQjtsh GdcZz4zfxMkxoFZIey4Cv14wjfeI/aApMsFpawPiNX/mV0nyfKce0v0KdqLEoWXPl9Vn sumQ== MIME-Version: 1.0 Received: by 10.68.223.167 with SMTP id qv7mr38321394pbc.127.1341593442701; Fri, 06 Jul 2012 09:50:42 -0700 (PDT) Received: by 10.142.81.16 with HTTP; Fri, 6 Jul 2012 09:50:42 -0700 (PDT) In-Reply-To: <380C7B38B20146EA92847439983CC60D@gmail.com> References: <380C7B38B20146EA92847439983CC60D@gmail.com> Date: Fri, 6 Jul 2012 18:50:42 +0200 Message-ID: Subject: Re: map side ("replicated") joins in Crunch From: Gabriel Reid To: crunch-dev@incubator.apache.org Cc: jadler@alum.mit.edu Content-Type: text/plain; charset=ISO-8859-1 Hi Josh, I've just committed the map side joins, after doing some more extensive testing on it today. Unfortunately, I forgot to squash the multiple commits into a single one (as I was planning on doing), but it's all in there and working. - Gabriel On Fri, Jul 6, 2012 at 7:28 AM, Gabriel Reid wrote: > Thanks for making the JIRA issue. > > I was going to do some more testing an a "real" cluster later on today, and as long as that all checks out then this will be ready to go in as far as I'm concerned. > > I don't see any reason to hold back on the big renaming for this patch -- if you're ready to do the package renaming, please go ahead. > > As long as my testing later today checks out and there aren't any glaring issues from anyone who has tried the patch out, I'll probably be checking the patch in later today (so it'll probably be in before you do the renaming anyhow). > > - Gabriel > > > On Friday 6 July 2012 at 00:11, Josh Wills wrote: > >> FYI: https://issues.apache.org/jira/browse/CRUNCH-3 to track this. >> >> Gabriel, is your feeling that this is ready to go in? I'm debating whether >> or not to check this in before/after I do the massive com.cloudera -> >> org.apache renaming. What do you think? >> >> On Tue, Jul 3, 2012 at 1:25 PM, Gabriel Reid wrote: >> >> > Hi Joe, >> > >> > Looking forward to hearing your feedback! This is still just a patch and >> > not committed yet, and there's still definitely room for help (i.e. ideas >> > on how to improve it, or do it totally differently), so certainly let me >> > know if you've got some ideas. >> > >> > - Gabriel >> > >> > >> > On Tuesday 3 July 2012 at 18:01, Joseph Adler wrote: >> > >> > > Awesome! Will give this a try soon... wish I could have helped with this >> > one... >> > > >> > > On Tue, Jul 3, 2012 at 1:53 AM, Gabriel Reid > > gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com))> wrote: >> > > > Thanks for pointing that out Chris. I'm guessing the mailing list is >> > > > stripping out attachments(?). Once the JIRA is up and running then >> > > > that will be taken care of I guess. >> > > > >> > > > The mime type on the last attempt was application/octet-stream, so >> > > > I've renamed this to a .txt file to try to ensure that it'll get a >> > > > text/plain mime type (although I don't know if that'll make a >> > > > difference). I've also pasted it inline below, hopefully one of those >> > > > solutions works. >> > > > >> > > > - Gabriel >> > > > >> > > > >> > > > diff --git src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >> > > > src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >> > > > index 420e8dc..c8ba596 100644 >> > > > --- src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >> > > > +++ src/main/java/com/cloudera/crunch/impl/mr/MRPipeline.java >> > > > @@ -59,9 +59,9 @@ import com.google.common.collect.Sets; >> > > > public class MRPipeline implements Pipeline { >> > > > >> > > > private static final Log LOG = LogFactory.getLog(MRPipeline.class); >> > > > - >> > > > + >> > > > private static final Random RANDOM = new Random(); >> > > > - >> > > > + >> > > > private final Class jarClass; >> > > > private final String name; >> > > > private final Map, Set> outputTargets; >> > > > @@ -75,15 +75,15 @@ public class MRPipeline implements Pipeline { >> > > > public MRPipeline(Class jarClass) throws IOException { >> > > > this(jarClass, new Configuration()); >> > > > } >> > > > - >> > > > - public MRPipeline(Class jarClass, String name){ >> > > > + >> > > > + public MRPipeline(Class jarClass, String name) { >> > > > this(jarClass, name, new Configuration()); >> > > > } >> > > > - >> > > > + >> > > > public MRPipeline(Class jarClass, Configuration conf) { >> > > > - this(jarClass, jarClass.getName(), conf); >> > > > + this(jarClass, jarClass.getName(), conf); >> > > > } >> > > > - >> > > > + >> > > > public MRPipeline(Class jarClass, String name, Configuration conf) { >> > > > this.jarClass = jarClass; >> > > > this.name (http://this.name) = name; >> > > > @@ -102,9 +102,9 @@ public class MRPipeline implements Pipeline { >> > > > >> > > > @Override >> > > > public void setConfiguration(Configuration conf) { >> > > > - this.conf = conf; >> > > > + this.conf = conf; >> > > > } >> > > > - >> > > > + >> > > > @Override >> > > > public PipelineResult run() { >> > > > MSCRPlanner planner = new MSCRPlanner(this, outputTargets); >> > > > @@ -125,8 +125,8 @@ public class MRPipeline implements Pipeline { >> > > > boolean materialized = false; >> > > > for (Target t : outputTargets.get(c)) { >> > > > if (!materialized && t instanceof Source) { >> > > > - c.materializeAt((SourceTarget) t); >> > > > - materialized = true; >> > > > + c.materializeAt((SourceTarget) t); >> > > > + materialized = true; >> > > > } >> > > > } >> > > > } >> > > > @@ -144,7 +144,7 @@ public class MRPipeline implements Pipeline { >> > > > cleanup(); >> > > > return res; >> > > > } >> > > > - >> > > > + >> > > > public PCollection read(Source source) { >> > > > return new InputCollection(source, this); >> > > > } >> > > > @@ -160,85 +160,120 @@ public class MRPipeline implements Pipeline { >> > > > @SuppressWarnings("unchecked") >> > > > public void write(PCollection pcollection, Target target) { >> > > > if (pcollection instanceof PGroupedTableImpl) { >> > > > - pcollection = ((PGroupedTableImpl) pcollection).ungroup(); >> > > > + pcollection = ((PGroupedTableImpl) pcollection).ungroup(); >> > > > } else if (pcollection instanceof UnionCollection || pcollection >> > > > instanceof UnionTable) { >> > > > - pcollection = pcollection.parallelDo("UnionCollectionWrapper", >> > > > - (MapFn)IdentityFn.getInstance(), pcollection.getPType()); >> > > > + pcollection = pcollection.parallelDo("UnionCollectionWrapper", >> > > > + (MapFn) IdentityFn. getInstance(), pcollection.getPType()); >> > > > } >> > > > addOutput((PCollectionImpl) pcollection, target); >> > > > } >> > > > >> > > > private void addOutput(PCollectionImpl impl, Target target) { >> > > > if (!outputTargets.containsKey(impl)) { >> > > > - outputTargets.put(impl, Sets.newHashSet()); >> > > > + outputTargets.put(impl, Sets. newHashSet()); >> > > > } >> > > > outputTargets.get(impl).add(target); >> > > > } >> > > > - >> > > > + >> > > > @Override >> > > > public Iterable materialize(PCollection pcollection) { >> > > > - >> > > > - if (pcollection instanceof UnionCollection) { >> > > > - pcollection = pcollection.parallelDo("UnionCollectionWrapper", >> > > > - (MapFn)IdentityFn.getInstance(), pcollection.getPType()); >> > > > - } >> > > > - PCollectionImpl impl = (PCollectionImpl) pcollection; >> > > > + >> > > > + PCollectionImpl pcollectionImpl = toPcollectionImpl(pcollection); >> > > > + ReadableSourceTarget srcTarget = >> > > > getMaterializeSourceTarget(pcollectionImpl); >> > > > + >> > > > + MaterializableIterable c = new MaterializableIterable(this, >> > > > srcTarget); >> > > > + if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) { >> > > > + outputTargetsToMaterialize.put(pcollectionImpl, c); >> > > > + } >> > > > + return c; >> > > > + } >> > > > + >> > > > + /** >> > > > + * Retrieve a ReadableSourceTarget that provides access to the >> > > >> > >> > >> > contents of a >> > > > + * {@link PCollection}. This is primarily intended as a helper method >> > > >> > >> > >> > to >> > > > + * {@link #materialize(PCollection)}. The underlying data of the >> > > > + * ReadableSourceTarget may not be actually present until the >> > > > pipeline is run. >> > > > + * >> > > > + * @param pcollection >> > > > + * The collection for which the ReadableSourceTarget is to be >> > > > + * retrieved >> > > > + * @return The ReadableSourceTarget >> > > > + * @throws IllegalArgumentException >> > > > + * If no ReadableSourceTarget can be retrieved for the given >> > > > + * PCollection >> > > > + */ >> > > > + public ReadableSourceTarget >> > > > getMaterializeSourceTarget(PCollection pcollection) { >> > > > + PCollectionImpl impl = toPcollectionImpl(pcollection); >> > > > SourceTarget matTarget = impl.getMaterializedAt(); >> > > > if (matTarget != null && matTarget instanceof ReadableSourceTarget) { >> > > > - return new MaterializableIterable(this, >> > > > (ReadableSourceTarget) matTarget); >> > > > + return (ReadableSourceTarget) matTarget; >> > > > + } >> > > > + >> > > > + ReadableSourceTarget srcTarget = null; >> > > > + if (outputTargets.containsKey(pcollection)) { >> > > > + for (Target target : outputTargets.get(impl)) { >> > > > + if (target instanceof ReadableSourceTarget) { >> > > > + srcTarget = (ReadableSourceTarget) target; >> > > > + break; >> > > > + } >> > > > + } >> > > > } >> > > > - >> > > > - ReadableSourceTarget srcTarget = null; >> > > > - if (outputTargets.containsKey(pcollection)) { >> > > > - for (Target target : outputTargets.get(impl)) { >> > > > - if (target instanceof ReadableSourceTarget) { >> > > > - srcTarget = (ReadableSourceTarget) target; >> > > > - break; >> > > > - } >> > > > - } >> > > > - } >> > > > - >> > > > - if (srcTarget == null) { >> > > > - SourceTarget st = >> > > >> > >> > >> > createIntermediateOutput(pcollection.getPType()); >> > > > - if (!(st instanceof ReadableSourceTarget)) { >> > > > - throw new IllegalArgumentException("The PType for the given >> > > > PCollection is not readable" >> > > > - + " and cannot be materialized"); >> > > > - } else { >> > > > - srcTarget = (ReadableSourceTarget) st; >> > > > - addOutput(impl, srcTarget); >> > > > - } >> > > > - } >> > > > - >> > > > - MaterializableIterable c = new MaterializableIterable(this, >> > > >> > >> > >> > srcTarget); >> > > > - outputTargetsToMaterialize.put(impl, c); >> > > > - return c; >> > > > + >> > > > + if (srcTarget == null) { >> > > > + SourceTarget st = >> > > >> > >> > >> > createIntermediateOutput(pcollection.getPType()); >> > > > + if (!(st instanceof ReadableSourceTarget)) { >> > > > + throw new IllegalArgumentException("The PType for the given >> > > > PCollection is not readable" >> > > > + + " and cannot be materialized"); >> > > > + } else { >> > > > + srcTarget = (ReadableSourceTarget) st; >> > > > + addOutput(impl, srcTarget); >> > > > + } >> > > > + } >> > > > + >> > > > + return srcTarget; >> > > > + } >> > > > + >> > > > + /** >> > > > + * Safely cast a PCollection into a PCollectionImpl, including >> > > > handling the case of UnionCollections. >> > > > + * @param pcollection The PCollection to be cast/transformed >> > > > + * @return The PCollectionImpl representation >> > > > + */ >> > > > + private PCollectionImpl toPcollectionImpl(PCollection >> > > > pcollection) { >> > > > + PCollectionImpl pcollectionImpl = null; >> > > > + if (pcollection instanceof UnionCollection) { >> > > > + pcollectionImpl = (PCollectionImpl) >> > > > pcollection.parallelDo("UnionCollectionWrapper", >> > > > + (MapFn) IdentityFn. getInstance(), pcollection.getPType()); >> > > > + } else { >> > > > + pcollectionImpl = (PCollectionImpl) pcollection; >> > > > + } >> > > > + return pcollectionImpl; >> > > > } >> > > > >> > > > public SourceTarget createIntermediateOutput(PType ptype) { >> > > > - return ptype.getDefaultFileSource(createTempPath()); >> > > > + return ptype.getDefaultFileSource(createTempPath()); >> > > > } >> > > > >> > > > public Path createTempPath() { >> > > > tempFileIndex++; >> > > > return new Path(tempDirectory, "p" + tempFileIndex); >> > > > } >> > > > - >> > > > + >> > > > private static Path createTempDirectory(Configuration conf) { >> > > > Path dir = new Path("/tmp/crunch" + RANDOM.nextInt()); >> > > > - try { >> > > > - FileSystem.get(conf).mkdirs(dir); >> > > > - } catch (IOException e) { >> > > > - LOG.error("Exception creating job output directory", e); >> > > > - throw new RuntimeException(e); >> > > > - } >> > > > + try { >> > > > + FileSystem.get(conf).mkdirs(dir); >> > > > + } catch (IOException e) { >> > > > + LOG.error("Exception creating job output directory", e); >> > > > + throw new RuntimeException(e); >> > > > + } >> > > > return dir; >> > > > } >> > > > - >> > > > + >> > > > @Override >> > > > public void writeTextFile(PCollection pcollection, String >> > > >> > >> > >> > pathName) { >> > > > // Ensure that this is a writable pcollection instance. >> > > > - pcollection = pcollection.parallelDo("asText", >> > > >> > >> > >> > IdentityFn.getInstance(), >> > > > - WritableTypeFamily.getInstance().as(pcollection.getPType())); >> > > > + pcollection = pcollection.parallelDo("asText", IdentityFn. >> > > > getInstance(), WritableTypeFamily >> > > > + .getInstance().as(pcollection.getPType())); >> > > > write(pcollection, At.textFile(pathName)); >> > > > } >> > > > >> > > > @@ -256,7 +291,7 @@ public class MRPipeline implements Pipeline { >> > > > LOG.info (http://LOG.info)("Exception during cleanup", e); >> > > > } >> > > > } >> > > > - >> > > > + >> > > > public int getNextAnonymousStageId() { >> > > > return nextAnonymousStageId++; >> > > > } >> > > > @@ -265,7 +300,7 @@ public class MRPipeline implements Pipeline { >> > > > public void enableDebug() { >> > > > // Turn on Crunch runtime error catching. >> > > > getConfiguration().setBoolean(RuntimeParameters.DEBUG, true); >> > > > - >> > > > + >> > > > // Write Hadoop's WARN logs to the console. >> > > > Logger crunchInfoLogger = LogManager.getLogger("com.cloudera.crunch"); >> > > > Appender console = crunchInfoLogger.getAppender("A"); >> > > > @@ -277,9 +312,9 @@ public class MRPipeline implements Pipeline { >> > > > LOG.warn("Could not find console appender named 'A' for writing >> > > > Hadoop warning logs"); >> > > > } >> > > > } >> > > > - >> > > > + >> > > > @Override >> > > > public String getName() { >> > > > - return name; >> > > > + return name; >> > > > } >> > > > } >> > > > diff --git >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >> > > > index d41a52e..68ef054 100644 >> > > > --- >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >> > > > +++ >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/impl/mr/run/CrunchRuntimeException.java >> > > > @@ -12,6 +12,10 @@ public class CrunchRuntimeException extends >> > > > RuntimeException { >> > > > super(e); >> > > > } >> > > > >> > > > + public CrunchRuntimeException(String msg, Exception e) { >> > > > + super(msg, e); >> > > > + } >> > > > + >> > > > public boolean wasLogged() { >> > > > return logged; >> > > > } >> > > > diff --git >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >> > > > src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >> > > > index 1122d62..4debfeb 100644 >> > > > --- src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >> > > > +++ src/main/java/com/cloudera/crunch/io/avro/AvroFileSource.java >> > > > @@ -45,7 +45,8 @@ public class AvroFileSource extends >> > > > FileSourceImpl implements ReadableSour >> > > > >> > > > @Override >> > > > public Iterable read(Configuration conf) throws IOException { >> > > > - return CompositePathIterable.create(FileSystem.get(conf), path, >> > > > new AvroFileReaderFactory( >> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf); >> > > > + return CompositePathIterable.create(fs, path, new >> > > >> > >> > >> > AvroFileReaderFactory( >> > > > (AvroType) ptype, conf)); >> > > > } >> > > > } >> > > > diff --git src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >> > > > index 24dec2d..462ef93 100644 >> > > > --- src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >> > > > +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileSource.java >> > > > @@ -26,18 +26,16 @@ import com.cloudera.crunch.io.ReadableSource; >> > > > import com.cloudera.crunch.io.impl.FileSourceImpl; >> > > > import com.cloudera.crunch.types.PType; >> > > > >> > > > -public class SeqFileSource extends FileSourceImpl implements >> > > > - ReadableSource { >> > > > +public class SeqFileSource extends FileSourceImpl implements >> > > > ReadableSource { >> > > > >> > > > public SeqFileSource(Path path, PType ptype) { >> > > > - super(path, ptype, SequenceFileInputFormat.class); >> > > > + super(path, ptype, SequenceFileInputFormat.class); >> > > > } >> > > > - >> > > > + >> > > > @Override >> > > > public Iterable read(Configuration conf) throws IOException { >> > > > - FileSystem fs = FileSystem.get(conf); >> > > > - return CompositePathIterable.create(fs, path, >> > > > - new SeqFileReaderFactory(ptype, conf)); >> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf); >> > > > + return CompositePathIterable.create(fs, path, new >> > > > SeqFileReaderFactory(ptype, conf)); >> > > > } >> > > > >> > > > @Override >> > > > diff --git >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >> > > > src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >> > > > index 69ca12b..4db6658 100644 >> > > > --- src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >> > > > +++ src/main/java/com/cloudera/crunch/io/seq/SeqFileTableSource.java >> > > > @@ -42,7 +42,7 @@ public class SeqFileTableSource extends >> > > > FileTableSourceImpl implemen >> > > > >> > > > @Override >> > > > public Iterable> read(Configuration conf) throws >> > > >> > >> > >> > IOException { >> > > > - FileSystem fs = FileSystem.get(conf); >> > > > + FileSystem fs = FileSystem.get(path.toUri(), conf); >> > > > return CompositePathIterable.create(fs, path, >> > > > new SeqFileTableReaderFactory((PTableType) ptype, conf)); >> > > > } >> > > > diff --git >> > > >> > >> > >> > src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >> > > > src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >> > > > index a876843..e0dbe68 100644 >> > > > --- src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >> > > > +++ src/main/java/com/cloudera/crunch/io/text/TextFileSource.java >> > > > @@ -67,7 +67,7 @@ public class TextFileSource extends >> > > > FileSourceImpl implements >> > > > >> > > > @Override >> > > > public Iterable read(Configuration conf) throws IOException { >> > > > - return CompositePathIterable.create(FileSystem.get(conf), path, >> > > > - new TextFileReaderFactory(ptype, conf)); >> > > > + return CompositePathIterable.create(FileSystem.get(path.toUri(), >> > > > conf), path, >> > > > + new TextFileReaderFactory(ptype, conf)); >> > > > } >> > > > } >> > > > diff --git src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >> > > > src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >> > > > new file mode 100644 >> > > > index 0000000..8072e07 >> > > > --- /dev/null >> > > > +++ src/main/java/com/cloudera/crunch/lib/join/MapsideJoin.java >> > > > @@ -0,0 +1,143 @@ >> > > > +package com.cloudera.crunch.lib.join; >> > > > + >> > > > +import java.io.IOException; >> > > > + >> > > > +import org.apache.hadoop.filecache.DistributedCache; >> > > > +import org.apache.hadoop.fs.FileSystem; >> > > > +import org.apache.hadoop.fs.Path; >> > > > + >> > > > +import com.cloudera.crunch.DoFn; >> > > > +import com.cloudera.crunch.Emitter; >> > > > +import com.cloudera.crunch.PTable; >> > > > +import com.cloudera.crunch.Pair; >> > > > +import com.cloudera.crunch.impl.mr.MRPipeline; >> > > > +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; >> > > > +import com.cloudera.crunch.io.ReadableSourceTarget; >> > > > +import com.cloudera.crunch.io.impl.SourcePathTargetImpl; >> > > > +import com.cloudera.crunch.types.PType; >> > > > +import com.cloudera.crunch.types.PTypeFamily; >> > > > +import com.google.common.collect.ArrayListMultimap; >> > > > +import com.google.common.collect.Multimap; >> > > > + >> > > > +/** >> > > > + * Utility for doing map side joins on a common key between two >> > > > {@link PTable}s. >> > > > + *

>> > > > + * A map side join is an optimized join which doesn't use a reducer; >> > > >> > >> > >> > instead, >> > > > + * the right side of the join is loaded into memory and the join is >> > > > performed in >> > > > + * a mapper. This style of join has the important implication that >> > > > the output of >> > > > + * the join is not sorted, which is the case with a conventional >> > > > (reducer-based) >> > > > + * join. >> > > > + *

>> > > > + * Note:This utility is only supported when running with a >> > > > + * {@link MRPipeline} as the pipeline. >> > > > + */ >> > > > +public class MapsideJoin { >> > > > + >> > > > + /** >> > > > + * Join two tables using a map side join. The right-side table will >> > > >> > >> > >> > be loaded >> > > > + * fully in memory, so this method should only be used if the right >> > > >> > >> > >> > side >> > > > + * table's contents can fit in the memory allocated to mappers. The >> > > >> > >> > >> > join >> > > > + * performed by this method is an inner join. >> > > > + * >> > > > + * @param left >> > > > + * The left-side table of the join >> > > > + * @param right >> > > > + * The right-side table of the join, whose contents will be fully >> > > > + * read into memory >> > > > + * @return A table keyed on the join key, containing pairs of joined >> > > >> > >> > >> > values >> > > > + */ >> > > > + public static PTable> join(PTable >> > > > left, PTable right) { >> > > > + >> > > > + if (!(right.getPipeline() instanceof MRPipeline)) { >> > > > + throw new CrunchRuntimeException("Map-side join is only >> > > > supported within a MapReduce context"); >> > > > + } >> > > > + >> > > > + MRPipeline pipeline = (MRPipeline) right.getPipeline(); >> > > > + pipeline.materialize(right); >> > > > + >> > > > + // TODO Move necessary logic to MRPipeline so that we can >> > > >> > >> > >> > theoretically >> > > > + // optimize his by running the setup of multiple map-side joins >> > > > concurrently >> > > > + pipeline.run(); >> > > > + >> > > > + ReadableSourceTarget> readableSourceTarget = pipeline >> > > > + .getMaterializeSourceTarget(right); >> > > > + if (!(readableSourceTarget instanceof SourcePathTargetImpl)) { >> > > > + throw new CrunchRuntimeException("Right-side contents can't be >> > > > read from a path"); >> > > > + } >> > > > + >> > > > + // Suppress warnings because we've just checked this cast via >> > > >> > >> > >> > instanceof >> > > > + @SuppressWarnings("unchecked") >> > > > + SourcePathTargetImpl> sourcePathTarget = >> > > > (SourcePathTargetImpl>) readableSourceTarget; >> > > > + >> > > > + Path path = sourcePathTarget.getPath(); >> > > > + DistributedCache.addCacheFile(path.toUri(), >> > > >> > >> > >> > pipeline.getConfiguration()); >> > > > + >> > > > + MapsideJoinDoFn mapJoinDoFn = new MapsideJoinDoFn> > > > V>(path.toString(), >> > > > + right.getPType()); >> > > > + PTypeFamily typeFamily = left.getTypeFamily(); >> > > > + return left.parallelDo( >> > > > + "mapjoin", >> > > > + mapJoinDoFn, >> > > > + typeFamily.tableOf(left.getKeyType(), >> > > > + typeFamily.pairs(left.getValueType(), right.getValueType()))); >> > > > + >> > > > + } >> > > > + >> > > > + static class MapsideJoinDoFn extends DoFn, >> > > > Pair>> { >> > > > + >> > > > + private String inputPath; >> > > > + private PType> ptype; >> > > > + private Multimap joinMap; >> > > > + >> > > > + public MapsideJoinDoFn(String inputPath, PType> ptype) { >> > > > + this.inputPath = inputPath; >> > > > + this.ptype = ptype; >> > > > + } >> > > > + >> > > > + private Path getCacheFilePath() { >> > > > + try { >> > > > + for (Path localPath : >> > > > DistributedCache.getLocalCacheFiles(getConfiguration())) { >> > > > + if (localPath.toString().endsWith(inputPath)) { >> > > > + return >> > > > localPath.makeQualified(FileSystem.getLocal(getConfiguration())); >> > > > + >> > > > + } >> > > > + } >> > > > + } catch (IOException e) { >> > > > + throw new CrunchRuntimeException(e); >> > > > + } >> > > > + >> > > > + throw new CrunchRuntimeException("Can't find local cache file >> > > > for '" + inputPath + "'"); >> > > > + } >> > > > + >> > > > + @Override >> > > > + public void initialize() { >> > > > + super.initialize(); >> > > > + >> > > > + ReadableSourceTarget> sourceTarget = >> > > > (ReadableSourceTarget>) ptype >> > > > + .getDefaultFileSource(getCacheFilePath()); >> > > > + Iterable> iterable = null; >> > > > + try { >> > > > + iterable = sourceTarget.read(getConfiguration()); >> > > > + } catch (IOException e) { >> > > > + throw new CrunchRuntimeException("Error reading right-side of >> > > > map side join: ", e); >> > > > + } >> > > > + >> > > > + joinMap = ArrayListMultimap.create(); >> > > > + for (Pair joinPair : iterable) { >> > > > + joinMap.put(joinPair.first(), joinPair.second()); >> > > > + } >> > > > + } >> > > > + >> > > > + @Override >> > > > + public void process(Pair input, Emitter> > > > V>>> emitter) { >> > > > + K key = input.first(); >> > > > + U value = input.second(); >> > > > + for (V joinValue : joinMap.get(key)) { >> > > > + Pair valuePair = Pair.of(value, joinValue); >> > > > + emitter.emit(Pair.of(key, valuePair)); >> > > > + } >> > > > + } >> > > > + >> > > > + } >> > > > + >> > > > +} >> > > > diff --git src/main/java/com/cloudera/crunch/types/PType.java >> > > > src/main/java/com/cloudera/crunch/types/PType.java >> > > > index af4ef1b..ae480aa 100644 >> > > > --- src/main/java/com/cloudera/crunch/types/PType.java >> > > > +++ src/main/java/com/cloudera/crunch/types/PType.java >> > > > @@ -15,6 +15,7 @@ >> > > > >> > > > package com.cloudera.crunch.types; >> > > > >> > > > +import java.io.Serializable; >> > > > import java.util.List; >> > > > >> > > > import org.apache.hadoop.fs.Path; >> > > > @@ -31,7 +32,7 @@ import com.cloudera.crunch.SourceTarget; >> > > > * {@code PCollection}. >> > > > * >> > > > */ >> > > > -public interface PType { >> > > > +public interface PType extends Serializable { >> > > > /** >> > > > * Returns the Java type represented by this {@code PType}. >> > > > */ >> > > > diff --git src/main/java/com/cloudera/crunch/types/avro/AvroType.java >> > > > src/main/java/com/cloudera/crunch/types/avro/AvroType.java >> > > > index 3db00c0..29af9fb 100644 >> > > > --- src/main/java/com/cloudera/crunch/types/avro/AvroType.java >> > > > +++ src/main/java/com/cloudera/crunch/types/avro/AvroType.java >> > > > @@ -14,6 +14,7 @@ >> > > > */ >> > > > package com.cloudera.crunch.types.avro; >> > > > >> > > > +import java.io.Serializable; >> > > > import java.util.List; >> > > > >> > > > import org.apache.avro.Schema; >> > > > @@ -41,7 +42,8 @@ public class AvroType implements PType { >> > > > private static final Converter AVRO_CONVERTER = new AvroKeyConverter(); >> > > > >> > > > private final Class typeClass; >> > > > - private final Schema schema; >> > > > + private final String schemaString; >> > > > + private transient Schema schema; >> > > > private final MapFn baseInputMapFn; >> > > > private final MapFn baseOutputMapFn; >> > > > private final List subTypes; >> > > > @@ -55,6 +57,7 @@ public class AvroType implements PType { >> > > > MapFn outputMapFn, PType... ptypes) { >> > > > this.typeClass = typeClass; >> > > > this.schema = Preconditions.checkNotNull(schema); >> > > > + this.schemaString = schema.toString(); >> > > > this.baseInputMapFn = inputMapFn; >> > > > this.baseOutputMapFn = outputMapFn; >> > > > this.subTypes = ImmutableList. builder().add(ptypes).build(); >> > > > @@ -76,6 +79,9 @@ public class AvroType implements PType { >> > > > } >> > > > >> > > > public Schema getSchema() { >> > > > + if (schema == null){ >> > > > + schema = new Schema.Parser().parse(schemaString); >> > > > + } >> > > > return schema; >> > > > } >> > > > >> > > > diff --git >> > src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >> > > > src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >> > > > new file mode 100644 >> > > > index 0000000..f265460 >> > > > --- /dev/null >> > > > +++ src/test/java/com/cloudera/crunch/impl/mr/MRPipelineTest.java >> > > > @@ -0,0 +1,60 @@ >> > > > +package com.cloudera.crunch.impl.mr; >> > > > + >> > > > +import static org.junit.Assert.assertEquals; >> > > > +import static org.mockito.Mockito.doReturn; >> > > > +import static org.mockito.Mockito.mock; >> > > > +import static org.mockito.Mockito.spy; >> > > > +import static org.mockito.Mockito.when; >> > > > + >> > > > +import java.io.IOException; >> > > > + >> > > > +import org.junit.Before; >> > > > +import org.junit.Test; >> > > > + >> > > > +import com.cloudera.crunch.SourceTarget; >> > > > +import com.cloudera.crunch.impl.mr.collect.PCollectionImpl; >> > > > +import com.cloudera.crunch.io.ReadableSourceTarget; >> > > > +import com.cloudera.crunch.types.avro.Avros; >> > > > + >> > > > +public class MRPipelineTest { >> > > > + >> > > > + private MRPipeline pipeline; >> > > > + >> > > > + @Before >> > > > + public void setUp() throws IOException { >> > > > + pipeline = spy(new MRPipeline(MRPipelineTest.class)); >> > > > + } >> > > > + >> > > > + @Test >> > > > + public void testGetMaterializeSourceTarget_AlreadyMaterialized() { >> > > > + PCollectionImpl materializedPcollection = >> > > > mock(PCollectionImpl.class); >> > > > + ReadableSourceTarget readableSourceTarget = >> > > > mock(ReadableSourceTarget.class); >> > > > + >> > > >> > >> > >> > when(materializedPcollection.getMaterializedAt()).thenReturn(readableSourceTarget); >> > > > + >> > > > + assertEquals(readableSourceTarget, >> > > > pipeline.getMaterializeSourceTarget(materializedPcollection)); >> > > > + } >> > > > + >> > > > + @Test >> > > > + public void >> > > >> > >> > >> > testGetMaterializeSourceTarget_NotMaterialized_HasOutput() { >> > > > + >> > > > + PCollectionImpl pcollection = mock(PCollectionImpl.class); >> > > > + ReadableSourceTarget readableSourceTarget = >> > > > mock(ReadableSourceTarget.class); >> > > > + when(pcollection.getPType()).thenReturn(Avros.strings()); >> > > > + >> > > >> > >> > >> > doReturn(readableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); >> > > > + when(pcollection.getMaterializedAt()).thenReturn(null); >> > > > + >> > > > + assertEquals(readableSourceTarget, >> > > > pipeline.getMaterializeSourceTarget(pcollection)); >> > > > + } >> > > > + >> > > > + @Test(expected = IllegalArgumentException.class) >> > > > + public void >> > > >> > >> > >> > testGetMaterializeSourceTarget_NotMaterialized_NotReadableSourceTarget() >> > > > { >> > > > + PCollectionImpl pcollection = mock(PCollectionImpl.class); >> > > > + SourceTarget nonReadableSourceTarget = >> > > >> > >> > >> > mock(SourceTarget.class); >> > > > + when(pcollection.getPType()).thenReturn(Avros.strings()); >> > > > + >> > > >> > >> > >> > doReturn(nonReadableSourceTarget).when(pipeline).createIntermediateOutput(Avros.strings()); >> > > > + when(pcollection.getMaterializedAt()).thenReturn(null); >> > > > + >> > > > + pipeline.getMaterializeSourceTarget(pcollection); >> > > > + } >> > > > + >> > > > +} >> > > > diff --git >> > > >> > >> > >> > src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >> > > > src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >> > > > new file mode 100644 >> > > > index 0000000..97e0c63 >> > > > --- /dev/null >> > > > +++ src/test/java/com/cloudera/crunch/lib/join/MapsideJoinTest.java >> > > > @@ -0,0 +1,102 @@ >> > > > +package com.cloudera.crunch.lib.join; >> > > > + >> > > > +import static org.junit.Assert.assertEquals; >> > > > +import static org.junit.Assert.assertTrue; >> > > > + >> > > > +import java.io.IOException; >> > > > +import java.util.Collections; >> > > > +import java.util.List; >> > > > + >> > > > +import org.junit.Test; >> > > > + >> > > > +import com.cloudera.crunch.FilterFn; >> > > > +import com.cloudera.crunch.MapFn; >> > > > +import com.cloudera.crunch.PTable; >> > > > +import com.cloudera.crunch.Pair; >> > > > +import com.cloudera.crunch.Pipeline; >> > > > +import com.cloudera.crunch.impl.mem.MemPipeline; >> > > > +import com.cloudera.crunch.impl.mr.MRPipeline; >> > > > +import com.cloudera.crunch.impl.mr.run.CrunchRuntimeException; >> > > > +import com.cloudera.crunch.test.FileHelper; >> > > > +import com.cloudera.crunch.types.writable.Writables; >> > > > +import com.google.common.collect.Lists; >> > > > + >> > > > +public class MapsideJoinTest { >> > > > + >> > > > + private static class LineSplitter extends MapFn> > > > Pair> { >> > > > + >> > > > + @Override >> > > > + public Pair map(String input) { >> > > > + String[] fields = input.split("\\|"); >> > > > + return Pair.of(Integer.parseInt(fields[0]), fields[1]); >> > > > + } >> > > > + >> > > > + } >> > > > + >> > > > + private static class NegativeFilter extends FilterFn> > > >> > >> > >> > String>> { >> > > > + >> > > > + @Override >> > > > + public boolean accept(Pair input) { >> > > > + return false; >> > > > + } >> > > > + >> > > > + } >> > > > + >> > > > + @Test(expected = CrunchRuntimeException.class) >> > > > + public void testNonMapReducePipeline() { >> > > > + runMapsideJoin(MemPipeline.getInstance()); >> > > > + } >> > > > + >> > > > + @Test >> > > > + public void testMapsideJoin_RightSideIsEmpty() throws IOException { >> > > > + MRPipeline pipeline = new MRPipeline(MapsideJoinTest.class); >> > > > + PTable customerTable = readTable(pipeline, >> > > > "customers.txt"); >> > > > + PTable orderTable = readTable(pipeline, >> > > >> > >> > >> > "orders.txt"); >> > > > + >> > > > + PTable filteredOrderTable = >> > > > orderTable.parallelDo(new NegativeFilter(), >> > > > + orderTable.getPTableType()); >> > > > + >> > > > + PTable> joined = >> > > > MapsideJoin.join(customerTable, >> > > > + filteredOrderTable); >> > > > + >> > > > + List>> materializedJoin = >> > > > Lists.newArrayList(joined >> > > > + .materialize()); >> > > > + >> > > > + assertTrue(materializedJoin.isEmpty()); >> > > > + >> > > > + } >> > > > + >> > > > + @Test >> > > > + public void testMapsideJoin() throws IOException { >> > > > + runMapsideJoin(new MRPipeline(MapsideJoinTest.class)); >> > > > + } >> > > > + >> > > > + private void runMapsideJoin(Pipeline pipeline) { >> > > > + PTable customerTable = readTable(pipeline, >> > > > "customers.txt"); >> > > > + PTable orderTable = readTable(pipeline, >> > > >> > >> > >> > "orders.txt"); >> > > > + >> > > > + PTable> joined = >> > > > MapsideJoin.join(customerTable, orderTable); >> > > > + >> > > > + List>> expectedJoinResult = >> > > > Lists.newArrayList(); >> > > > + expectedJoinResult.add(Pair.of(111, Pair.of("John Doe", "Corn >> > > >> > >> > >> > flakes"))); >> > > > + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet >> > > >> > >> > >> > paper"))); >> > > > + expectedJoinResult.add(Pair.of(222, Pair.of("Jane Doe", "Toilet >> > > > plunger"))); >> > > > + expectedJoinResult.add(Pair.of(333, Pair.of("Someone Else", >> > > > "Toilet brush"))); >> > > > + >> > > > + List>> joinedResultList = >> > > > Lists.newArrayList(joined >> > > > + .materialize()); >> > > > + Collections.sort(joinedResultList); >> > > > + >> > > > + assertEquals(expectedJoinResult, joinedResultList); >> > > > + } >> > > > + >> > > > + private static PTable readTable(Pipeline pipeline, >> > > > String filename) { >> > > > + try { >> > > > + return >> > > >> > >> > >> > pipeline.readTextFile(FileHelper.createTempCopyOf(filename)).parallelDo("asTable", >> > > > + new LineSplitter(), Writables.tableOf(Writables.ints(), >> > > > Writables.strings())); >> > > > + } catch (IOException e) { >> > > > + throw new RuntimeException(e); >> > > > + } >> > > > + } >> > > > + >> > > > +} >> > > > diff --git src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >> > > > src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >> > > > index 74e2ad3..c6a0b46 100644 >> > > > --- src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >> > > > +++ src/test/java/com/cloudera/crunch/types/avro/AvrosTest.java >> > > > @@ -25,6 +25,7 @@ import org.apache.avro.Schema.Type; >> > > > import org.apache.avro.generic.GenericData; >> > > > import org.apache.avro.util.Utf8; >> > > > import org.apache.hadoop.io.LongWritable; >> > > > +import org.junit.Ignore; >> > > > import org.junit.Test; >> > > > >> > > > import com.cloudera.crunch.Pair; >> > > > @@ -103,6 +104,7 @@ public class AvrosTest { >> > > > } >> > > > >> > > > @Test >> > > > + @Ignore("This test creates an invalid schema that causes >> > > > Schema#toString to fail") >> > > > public void testNestedTables() throws Exception { >> > > > PTableType pll = Avros.tableOf(Avros.longs(), >> > > >> > >> > >> > Avros.longs()); >> > > > PTableType, String> nest = Avros.tableOf(pll, >> > > > Avros.strings()); >> > > > diff --git src/test/resources/customers.txt >> > > >> > >> > >> > src/test/resources/customers.txt >> > > > new file mode 100644 >> > > > index 0000000..98f3f3d >> > > > --- /dev/null >> > > > +++ src/test/resources/customers.txt >> > > > @@ -0,0 +1,4 @@ >> > > > +111|John Doe >> > > > +222|Jane Doe >> > > > +333|Someone Else >> > > > +444|Has No Orders >> > > > \ No newline at end of file >> > > > diff --git src/test/resources/orders.txt src/test/resources/orders.txt >> > > > new file mode 100644 >> > > > index 0000000..2f1383f >> > > > --- /dev/null >> > > > +++ src/test/resources/orders.txt >> > > > @@ -0,0 +1,4 @@ >> > > > +222|Toilet plunger >> > > > +333|Toilet brush >> > > > +222|Toilet paper >> > > > +111|Corn flakes >> > > > \ No newline at end of file >> > > > >> > > > >> > > > >> > > > On Tue, Jul 3, 2012 at 10:42 AM, Christian Tzolov >> > > > >> > > >> > >> > >> > wrote: >> > > > > Hi Gabriel, >> > > > > >> > > > > Seems like the attachment is missing. >> > > > > >> > > > > Cheers, >> > > > > Chris >> > > > > >> > > > > On Tue, Jul 3, 2012 at 9:23 AM, Gabriel Reid > > gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com))> wrote: >> > > > > >> > > > > > Hi guys, >> > > > > > >> > > > > > Attached (hopefully) is a patch for an initial implementation of >> > map >> > > > > > side joins. It's currently implemented as a static method in a >> > > > > >> > > > >> > > >> > >> > >> > class >> > > > > > called MapsideJoin, with the same interface as the existing Join >> > > > > >> > > > >> > > >> > >> > >> > class >> > > > > > (with only inner joins being implemented for now). The way it >> > > > > >> > > > >> > > >> > >> > >> > works is >> > > > > > that the right-side PTable of the join is put in the distributed >> > > > > >> > > > >> > > >> > >> > >> > cache >> > > > > > and then read by the join function at runtime. >> > > > > > >> > > > > > There's one spot that I can see for a potentially interesting >> > > > > > optimization -- MRPipeline#run is called once for each map side >> > > > > >> > > > >> > > >> > >> > >> > join >> > > > > > that is set up, but if the setup of the joins was done within >> > > > > > MRPipeline, then we could set up multiple map side joins in >> > > > > >> > > > >> > > >> > >> > >> > parallel >> > > > > > with a single call to MRPipeline#run. OTOH, a whole bunch of map >> > > > > >> > > > >> > > >> > >> > >> > side >> > > > > > joins in parallel probably isn't that common of an operation. >> > > > > > >> > > > > > If anyone feels like taking a look at the patch, any feedback is >> > > > > > appreciated. If nobody sees something that needs serious changes in >> > > > > > the patch, I'll commit it. >> > > > > > >> > > > > > - Gabriel >> > > > > > >> > > > > > >> > > > > > On Thu, Jun 21, 2012 at 9:09 AM, Gabriel Reid < >> > gabriel.reid@gmail.com (mailto:gabriel.reid@gmail.com)> >> > > > > > wrote: >> > > > > > > Replying to all... >> > > > > > > >> > > > > > > On Thu, Jun 21, 2012 at 8:40 AM, Josh Wills > > jwills@cloudera.com (mailto:jwills@cloudera.com))> wrote: >> > > > > > > > >> > > > > > > > So there's a philosophical issue here: should Crunch ever make >> > > > > > > > decisions about how to do something itself based on its >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > estimates of >> > > > > > > > the size of the data sets, or should it always do exactly what >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > the >> > > > > > > > developer indicates? >> > > > > > > > >> > > > > > > > I can make a case either way, but I think that no matter what, >> > we >> > > > > > > > would want to have explicit functions for performing a join >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > that reads >> > > > > > > > one data set into memory, so I think we can proceed w/the >> > > > > > > > implementation while folks weigh in on what their preferences >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > are for >> > > > > > > > the default join() behavior (e.g., just do a reduce-side join, >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > or try >> > > > > > > > to figure out the best join given information about the input >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > data and >> > > > > > > > some configuration parameters.) >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > I definitely agree on needing to have an explicit way to invoke >> > one or >> > > > > > > the other -- and in general I don't like having magic behind the >> > > > > > > scenes to decide on behaviour (especially considering Crunch is >> > > > > > > generally intended to be closer to the metal than Pig and Hive). >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > I'm >> > > > > > > not sure if the runtime decision is something specific to some >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > of my >> > > > > > > use cases or if it could be useful to a wider audience. >> > > > > > > >> > > > > > > The ability to dynamically decide at runtime whether a map side >> > join >> > > > > > > should be used can also easily be tacked on outside of Crunch, >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > and >> > > > > > > won't impact the underlying implementation (as you pointed out), >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > so I >> > > > > > > definitely also agree on focusing on the underlying >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > implementation >> > > > > > > first, and we can worry about the options used for exposing it >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > later >> > > > > > > on. >> > > > > > > >> > > > > > > - Gabriel >> >> >> -- >> Director of Data Science >> Cloudera >> Twitter: @josh_wills > > >