hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ananth T. Sarathy" <ananth.t.sara...@gmail.com>
Subject Re: Faster alternative to FSDataInputStream
Date Wed, 19 Aug 2009 17:46:08 GMT
Well, we are using it store large binary files. I get the distributing the
processing will allow faster times when doing multiple read/writes, but
didn't realize that the map and reduce would help when writing one file.
Ananth T Sarathy


On Wed, Aug 19, 2009 at 12:14 PM, Edward Capriolo <edlinuxguru@gmail.com>wrote:

> Ananth,
>
> That is your issue really.
>
> For example. I have 20 web servers and I wish to download all the
> weblogs from all of them into hadoop.
>
> If you write a top down program that uses FSDataOutput. You are using
> hadoop half way. You are using the distributed file system, but you
> are not doing any distributed processing.
>
> Better is to specify all the servers/files you with to download as
> your input file. Tell hadoop to use NLineInput format. Move your code
> inside a map function.  Now since hadoop ran run multiple mappers
> using -Dmapred.map.tasks=6  will cause 6 fetchers to run in parallel.
> You can up this as high as you are comfortable with.
>
> Also now that you are using m/r you don't have to write files with
> FSDataOuputStream , you can use output.collect() to make a sequence
> file.
>
> In my case I am using commons-FTP and FSDataOutputStream (not using
> output.collect() ) as I do not want a big sequence file I want the
> actual files as they exist on the web server I will merge them down
> the line in my process. This works very well. I could turn the number
> of mappers higher, but I don't want to beat up my web servers and
> network anymore. (hint: turn off speculative execution)
>
> Now you know all my secrets. Good luck :)
>
>
> On Wed, Aug 19, 2009 at 11:45 AM, Ananth T.
> Sarathy<ananth.t.sarathy@gmail.com> wrote:
> > Right now just in top down program. I am still learning this, so I need
> put
> > this in a map and reduce to optimize speed I will. Right now I am just
> > testing certain things, and getting a skeleton to write and pull files
> from
> > the s3 storage. Actual implementation is still being engineered.
> >
> >
> > Ananth T Sarathy
> >
> >
> > On Wed, Aug 19, 2009 at 11:11 AM, Edward Capriolo <edlinuxguru@gmail.com
> >wrote:
> >
> >> >>It would be as fast as underlying filesystem goes.
> >> I would not agree with that statement. There is overhead. If you have
> >> a single threaded process writing many small files you do not get the
> >> parallel write speed. In some testing I did writing a small file can
> >> take 30-300 ms. So if you have 9000 small files (like I did) and you
> >> are single threaded this takes a long time.
> >>
> >> If you orchestrate your task to use FSDataInput and FSDataOutput in
> >> the map or reduce phase then each mapper or reducer is writing a file
> >> at a time. Now that is fast.
> >>
> >> Ananth, are you doing your r/w inside a map/reduce job or are you just
> >> using FS* in a top down program?
> >>
> >>
> >>
> >> On Wed, Aug 19, 2009 at 1:26 AM, Raghu Angadi<rangadi@yahoo-inc.com>
> >> wrote:
> >> > Ananth T. Sarathy wrote:
> >> >>
> >> >> I am trying to download binary files stored in Hadoop but there is
> like
> >> a
> >> >> 2
> >> >> minute wait on a 20mb file when I try to execute the in.read(buf).
> >> >
> >> > What does this mean : 2 min to pipe 20mb or one or your one of the
> >> in.read()
> >> > calls took 2 minutes? Your code actually measures team for read and
> >> write.
> >> >
> >> > There is nothing in FSInputstream to cause this slow down. Do you
> think
> >> > anyone would use Hadoop otherwise? It would be as fast as underlying
> >> > filesystem goes.
> >> >
> >> > Raghu.
> >> >
> >> >> is there a better way to be doing this?
> >> >>
> >> >>    private void pipe(InputStream in, OutputStream out) throws
> >> IOException
> >> >>    {    System.out.println(System.currentTimeMillis()+" Starting to
> Pipe
> >> >> Data");
> >> >>        byte[] buf = new byte[1024];
> >> >>        int read = 0;
> >> >>        while ((read = in.read(buf)) >= 0)
> >> >>        {
> >> >>            out.write(buf, 0, read);
> >> >>            System.out.println(System.currentTimeMillis()+" Piping
> >> Data");
> >> >>        }
> >> >>        out.flush();
> >> >>        System.out.println(System.currentTimeMillis()+" Finished
> Piping
> >> >> Data");
> >> >>
> >> >>    }
> >> >>
> >> >> public void readFile(String fileToRead, OutputStream out)
> >> >>            throws IOException
> >> >>    {
> >> >>        System.out.println(System.currentTimeMillis()+" Start Read
> >> File");
> >> >>        Path inFile = new Path(fileToRead);
> >> >>        System.out.println(System.currentTimeMillis()+" Set Path");
> >> >>        // Validate the input/output paths before reading/writing.
> >> >>
> >> >>        if (!fs.exists(inFile))
> >> >>        {
> >> >>            throw new HadoopFileException("Specified file  " +
> fileToRead
> >> >>                    + " not found.");
> >> >>        }
> >> >>        if (!fs.isFile(inFile))
> >> >>        {
> >> >>            throw new HadoopFileException("Specified file  " +
> fileToRead
> >> >>                    + " not found.");
> >> >>        }
> >> >>        // Open inFile for reading.
> >> >>        System.out.println(System.currentTimeMillis()+" Opening Data
> >> >> Stream");
> >> >>        FSDataInputStream in = fs.open(inFile);
> >> >>
> >> >>        System.out.println(System.currentTimeMillis()+" Opened Data
> >> >> Stream");
> >> >>        // Open outFile for writing.
> >> >>
> >> >>        // Read from input stream and write to output stream until
> EOF.
> >> >>        pipe(in, out);
> >> >>
> >> >>        // Close the streams when done.
> >> >>        out.close();
> >> >>        in.close();
> >> >>    }
> >> >> Ananth T Sarathy
> >> >>
> >> >
> >> >
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message