hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jim the Standing Bear" <standingb...@gmail.com>
Subject Re: can jobs be launched recursively within a mapper ?
Date Tue, 30 Oct 2007 19:20:14 GMT
Hi jorgen,

thank you for taking the time showing me the pseudo-code for a
queueInputFormat.  I will see if I can implement more details and use
it for my project.

-- Jim

On 10/30/07, Johnson, Jorgen <jorgenj@amazon.com> wrote:
> Hey all,
>
> Any reason why something like this wouldn't work?
>
> Create a QueueInputFormat, which provides a RecordReader implementation that pops values
off a globally accessible queue*.  This would require filling the queue with values prior
to loading the map/red job.  This would allow the mappers to cram values back into the queue
for further processing when necessary.
>
> We could even avoid pre-loading the queue by combining this with some form of hybrid
InputFormat which first reads a normal input file, but after each reader has finished reading
its piece of the input file (and before it receives the OK to quit), it would try to pull
items off the queue for further processing.
>
> At the bottom is a pseudo-implementation to help illustrate what I'm saying.
>
> -jorgenj
>
> *A simple db table or file store in hdfs might be ways to accomplish this globally accessible
queue, or something like SQS might work: http://www.amazon.com/Simple-Queue-Service-home-page/b?ie=UTF8&node=13584001
=)
>
>
> /** InputFormat which allows map/red jobs to process values
>  * out of a globally accessible queue
>  *
>  * Note: The queue must be primed prior to kicking off any job which
>  * attempts to process values in the queue
>  */
> public class QueueInputFormat extends Object implements InputFormat {
>
>     public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter)
throws IOException {
>         return new QueueRecordReader();
>     }
>
>     public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
>         int overrideNumSplits = job.getInt("input.queue.hardcoded.num.maps", numSplits);
>
>         InputSplit[] splits = new InputSplit[overrideNumSplits];
>         for ( int i = 0; i < numSplits; i++ ) {
>             splits[i] = new QueueInputSplit(i);
>         }
>
>         return splits;
>     }
>
>     public void validateInput(JobConf job) throws IOException {
>         throw new UnsupportedOperationException("Implement your validation, if needed");
>     }
>
>     private static final class QueueRecordReader implements RecordReader {
>         public void close() throws IOException {
>             // close the queue
>         }
>
>         public WritableComparable createKey() {
>             //this might be the id of this object from the queue, or whatever you want
>             return new LongWritable();
>         }
>
>         public Writable createValue() {
>             //replace this with a type representing what's in your queue
>             return new Text();
>         }
>
>         public long getPos() throws IOException {
>             return (queueIsExhausted()?1:0);
>         }
>
>         public float getProgress() throws IOException {
>             //could change this to report progress based on size of queue...
>             return (queueIsExhausted()?1:0);
>         }
>
>         public boolean next(Writable key, Writable value) throws IOException {
>            ((LongWritable)key).set(1);
>            ((Text)value).set(fetchValueFromQueue());
>
>            return true;
>         }
>
>         /** Use this to determine when the queue is exhausted, which in
>          * turn determines when the mappers will stop (no more items to process)
>          */
>         private boolean queueIsExhausted() {
>             /*
>              * This could be as simple as checking if the queue is empty,
>              * or this could block until all mappers have signalled* that
>              * they are finished AND the queue is empty...
>              */
>             throw new UnsupportedOperationException("Depends on your queueing impl");
>         }
>
>         /** Fetches a value from the queue */
>         private String fetchValueFromQueue() {
>             throw new UnsupportedOperationException("Depends on your queueing impl");
>         }
>     }
>
>     private static final class QueueInputSplit implements InputSplit {
>         private final int id;
>
>         private QueueInputSplit(int id) {
>             this.id = id;
>         }
>
>         public long getLength() throws IOException {
>             return 1;
>         }
>
>         public String[] getLocations() throws IOException {
>             return new String[]{"queueIdentifierGoesHere?"+id};
>         }
>
>         public void readFields(DataInput in) throws IOException {
>             throw new UnsupportedOperationException("Real implementation needs to handle
this");
>         }
>
>         public void write(DataOutput out) throws IOException {
>             throw new UnsupportedOperationException("Real implementation needs to handle
this");
>         }
>
>     }
> }
>
>
>
> -----Original Message-----
> From: Jim the Standing Bear [mailto:standingbear@gmail.com]
> Sent: Tuesday, October 30, 2007 8:04 AM
> To: hadoop-user@lucene.apache.org
> Cc: stuhood@webmail.us
> Subject: Re: can jobs be launched recursively within a mapper ?
>
> thank you for all the help. I think I am beginning to gain a more
> clear picture of hadoop.  I will try the file solution.
>
> On 10/29/07, Aaron Kimball <ak@cs.washington.edu> wrote:
> > If you modify the JobConf for a running job within the context of a
> > mapper, the changes will not propagate back to the other machines.
> > JobConfs are serialized to XML and then distributed to the mapping nodes
> > where they are read back into the running Java tasks. There is no
> > "refresh" function that I am aware of.
> >
> > - Aaron
> >
> > Jim the Standing Bear wrote:
> > > Thanks, Stu...  Maybe my mind is way off track - but I still sense a
> > > problem with the mapper sending feedbacks to the job controller.  That
> > > is, when a mapper has reached the terminal condition, how can it tell
> > > the job controller to stop?
> > >
> > > If I keep a JobConf object in the mapper, and set a property
> > > "stop.processing" to true when a mapping task has reached the terminal
> > > condition, will it cause synchronization problems?  There could be
> > > other mapping tasks that still wish to go on?
> > >
> > > I tried to find a way so that the job controller can open the file in
> > > the output path at the end of the loop to read the contents; but thus
> > > far, I haven't seen a way to achieve this.
> > >
> > > Does this mean I have hit a dead-end?
> > >
> > > -- Jim
> > >
> > >
> > >
> > > On 10/29/07, Stu Hood <stuhood@webmail.us> wrote:
> > >
> > >> The iteration would take place in your control code (your 'main' method,
as shown in the examples).
> > >>
> > >> In order to prevent records from looping infinitely, each iteration would
need to use a separate output/input directory.
> > >>
> > >> Thanks,
> > >> Stu
> > >>
> > >>
> > >> -----Original Message-----
> > >> From: Jim the Standing Bear <standingbear@gmail.com>
> > >> Sent: Monday, October 29, 2007 5:45pm
> > >> To: hadoop-user@lucene.apache.org
> > >> Subject: Re: can jobs be launched recursively within a mapper ?
> > >>
> > >> thanks, Owen and David,
> > >>
> > >> I also thought of making a queue so that I can push catalog names to
> > >> the end of it, while the job control loop keeps removing items off the
> > >> queue until there is no more left.
> > >>
> > >> However, the problem is I don't see how I can do so within the
> > >> map/reduce context.  All the code examples are one-shot deals and
> > >> there is no iteration involved.
> > >>
> > >> Furthermore, what David said made sense, but to avoid infinite loop,
> > >> the code must remove the record it just read from the input file.  How
> > >> do I do that using hadoop's fs?  or does hadoop take care of it
> > >> automatically?
> > >>
> > >> -- Jim
> > >>
> > >>
> > >>
> > >> On 10/29/07, David Balatero <ezwelty@u.washington.edu> wrote:
> > >>
> > >>> Aren't these questions a little advanced for a bear to be asking?
> > >>> I'll be here all night...
> > >>>
> > >>> But seriously, if your job is inherently recursive, one possible way
> > >>> to do it would be to make sure that you output in the same format
> > >>> that you input. Then you can keep re-reading the outputted file back
> > >>> into a new map/reduce job, until you hit some base case and you
> > >>> terminate. I've had a main method before that would kick off a bunch
> > >>> of jobs in a row -- but I wouldn't really recommend starting another
> > >>> map/reduce job in the scope of a running map() or reduce() method.
> > >>>
> > >>> - David
> > >>>
> > >>>
> > >>> On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:
> > >>>
> > >>>
> > >>>> then
> > >>>>
> > >>>
> > >> --
> > >> --------------------------------------
> > >> Standing Bear Has Spoken
> > >> --------------------------------------
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > >
> >
>
>
> --
> --------------------------------------
> Standing Bear Has Spoken
> --------------------------------------
>


-- 
--------------------------------------
Standing Bear Has Spoken
--------------------------------------

Mime
View raw message