crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: Pipeline throwing No Output? exception
Date Mon, 31 Mar 2014 14:03:57 GMT
Hey guys,

Been banging on this for a couple of days, but I can't replicate the
failure. Here are the integration tests I wrote in EmptyPCollectionIT:

  @Test
  public void testLeftJoinEmptyWithData() throws Exception {
    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class,
tempDir.getDefaultConfiguration());
    String shakes = tempDir.copyResourceFileName("shakes.txt");
    PCollection<String> empty = p.emptyPCollection(Writables.strings());
    PCollection<String> withData = p.read(From.textFile(shakes));
    PTable<String, Long> emptyTable = empty.parallelDo(
        new SplitFn(), Writables.tableOf(Writables.strings(),
Writables.longs()));
    PTable<String, Integer> dataTable = withData.parallelDo(
        new IntSplitFn(), Writables.tableOf(Writables.strings(),
Writables.ints()));
    assertTrue(Iterables.isEmpty(Join.leftJoin(emptyTable,
dataTable).values().materialize()));
  }

  @Test
  public void testLeftJoinDataWithEmpty() throws Exception {
    MRPipeline p = new MRPipeline(EmptyPCollectionIT.class,
tempDir.getDefaultConfiguration());
    String shakes = tempDir.copyResourceFileName("shakes.txt");
    PCollection<String> empty = p.emptyPCollection(Writables.strings());
    PCollection<String> withData = p.read(From.textFile(shakes));
    PTable<String, Long> emptyTable = empty.parallelDo(
        new SplitFn(), Writables.tableOf(Writables.strings(),
Writables.longs()));
    PTable<String, Integer> dataTable = withData.parallelDo(
        new IntSplitFn(), Writables.tableOf(Writables.strings(),
Writables.ints()));
    assertFalse(Iterables.isEmpty(Join.leftJoin(dataTable,
emptyTable).values().materialize()));
  }

Could you tweak them to make them fail for me?

Josh


On Sun, Mar 30, 2014 at 7:10 AM, Josh Wills <jwills@cloudera.com> wrote:

> I'm just back from vacation; I'll take a look at the planner error and see
> if I can come up with a workaround.
>
>
> On Sat, Mar 29, 2014 at 1:53 PM, Jinal Shah <jinalshah2007@gmail.com>wrote:
>
>> Any updates on this?
>> On Mar 20, 2014 2:27 PM, "Jinal Shah" <jinalshah2007@gmail.com> wrote:
>>
>> > Sorry Micah U and V are totally different Types. Just wanted to clarify
>> it.
>> >
>> >
>> > On Thu, Mar 20, 2014 at 2:00 PM, Jinal Shah <jinalshah2007@gmail.com
>> >wrote:
>> >
>> >> Hey Micah,
>> >>
>> >> Yes you are right and this is what is going on in that // do something.
>> >> (Higher Level overview)
>> >>
>> >> Here U and V are same
>> >>
>> >> PCollection<V> collectionWhichCouldBeEmpty = null;
>> >> if(path.exists){
>> >>     collectionWhichCouldBeEmpty= pipeline.read(FromPath, PType.V);
>> >> } else{
>> >>     collectionWhichCouldBeEmpty = pipeline.emptyPCollection();
>> >> }
>> >>
>> >> PCollection<U> collectionWhichHasData =
>> DataComingFromDifferentSource();
>> >>
>> >> PTable<K,V> VTable = collectionWhichCouldBeEmpty.by(PType<K>);
>> >>
>> >> PTable<K,U> UTable = collectionWhichHasData.by(PType<K>);
>> >>
>> >> UVTable = Join.join(UTable, VTable, Join.LEFT);
>> >>
>> >> pipeline.write(UVTable.values(), somePath, PType<U>);
>> >>
>> >> pipeline.run() // error is here
>> >>
>> >>
>> >> Hope this helps.
>> >>
>> >>
>> >>
>> >>
>> >> On Thu, Mar 20, 2014 at 12:50 PM, Micah Whitacre <mkwhit@gmail.com
>> >wrote:
>> >>
>> >>> Jinal can you elaborate on the "//do something" section of the code?
>>  I
>> >>> thought when I heard it described other PCollections were being joined
>> >>> with
>> >>> the emptyPCollection and it was the outcome of the joins and
>> additional
>> >>> processing that was actually being persisted.
>> >>>
>> >>>
>> >>> On Thu, Mar 20, 2014 at 10:18 AM, Chao Shi <stepinto@live.com>
wrote:
>> >>>
>> >>> > Hi Josh and Jinal,
>> >>> >
>> >>> > This was introduced to help the following case: In one of our MR
>> >>> programs,
>> >>> > there is a command line option that one can optionally specify
a
>> path
>> >>> to
>> >>> > data to be joined on. Before introducing emptyCollection(), we
have
>> to
>> >>> do
>> >>> > like this:
>> >>> >
>> >>> > Path path = ...
>> >>> > PCollection in1 = null;
>> >>> > if (path != null) {
>> >>> >   in = pipeline.read(...);
>> >>> > }
>> >>> > PCollection in2 = pipeline.read(...);
>> >>> > if (in1 != null) {
>> >>> >   in2 = in2.join(in1);
>> >>> > }
>> >>> >
>> >>> > You can see checks for null everywhere. With emptyPColleciton,
we
>> can
>> >>> do
>> >>> > this:
>> >>> >
>> >>> > if (path != null) {
>> >>> >   in2 = pipeline.read();
>> >>> > } else {
>> >>> >   in2 = emptyPCollection();
>> >>> > }
>> >>> > in1.join(in2)
>> >>> >
>> >>> > I think Jinal's case should be a bad case for our current
>> >>> implementation.
>> >>> > Perhaps we should change it to create an empty output directory
>> rather
>> >>> than
>> >>> > report an error, which doesn't start the MR and can save the job
>> >>> start-up
>> >>> > time. This is the benefit for knowing PCollection in plan-time.
>> >>> >
>> >>> >
>> >>> > 2014-03-16 23:34 GMT+08:00 Josh Wills <josh.wills@gmail.com>:
>> >>> >
>> >>> > > +chao
>> >>> > >
>> >>> > > Inlined.
>> >>> > >
>> >>> > > On Sat, Mar 15, 2014 at 12:34 PM, Jinal Shah <
>> >>> jinalshah2007@gmail.com
>> >>> > >wrote:
>> >>> > >
>> >>> > >> Hi,
>> >>> > >>
>> >>> > >> I actually came across a particular case I'm not sure
whether the
>> >>> > behavior
>> >>> > >> is right or not. So here is what is happening I am getting
No
>> Output
>> >>> > >> exception throwing while trying to run my Crunch job.
On further
>> >>> > >> investigating I found that I was using
>> Pipeline.emptyCollection().
>> >>> So
>> >>> > here
>> >>> > >> is how my scenario looks like
>> >>> > >>
>> >>> > >> PCollection<V> collectionWhichCouldBeEmpty = null;
>> >>> > >> if(path.exists){
>> >>> > >>     collectionWhichCouldBeEmpty= pipeline.read(FromPath,
>> PType.V);
>> >>> > >> } else{
>> >>> > >>     collectionWhichCouldBeEmpty = pipeline.emptyPCollection();
>> >>> > >> }
>> >>> > >>
>> >>> > >> //do some operations
>> >>> > >>
>> >>> > >> pipeline.write(Target);
>> >>> > >>
>> >>> > >> pipeline.run()// this is where it is throwing the error
>> >>> > >>
>> >>> > >>
>> >>> >
>> >>>
>> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java#L287
>> >>> > >>
>> >>> > >> On further debugging I found that the Vertex didn't have
an
>> input.
>> >>> > >>
>> >>> > >>
>> >>> >
>> >>>
>> https://github.com/apache/crunch/blob/master/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java#L275
>> >>> > >>
>> >>> > >>
>> >>> > >> So If I use the pipeline.read and it creates an Empty
>> PCollection it
>> >>> > works
>> >>> > >> since it has the input source but If I create an Empty
>> PCollection
>> >>> using
>> >>> > >> the pipeline.emptyPCollection which doesn't have an input
source
>> >>> then it
>> >>> > >> fails
>> >>> > >>
>> >>> > >> Not sure if the case is missed or it has to be like this.
>> >>> > >>
>> >>> > >
>> >>> > > It's a good question, and I'm not sure of the answer. Added
Chao
>> to
>> >>> the
>> >>> > > To: line to ask him what the intention was in this case.
>> >>> > >
>> >>> > >
>> >>> > >> Thanks
>> >>> > >> Jinal
>> >>> > >>
>> >>> > >
>> >>> > >
>> >>> >
>> >>>
>> >>
>> >>
>> >
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message