hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shevek <she...@karmasphere.com>
Subject Re: How to evenly split data file
Date Fri, 14 Oct 2011 17:36:22 GMT
I can't answer your question fully without sitting in front of it using a
debugger, but the principle is this:

Hadoop's JobClient splits the data into approximately evenly sized blocks in
bytes. The trick now is to synchronize those blocks on record boundaries.
All file formats behave approximately the same way, and as I understand it,
the rough algorithm is this:

Each file must have "synchronization points" (a term I now define, not
common terminology in Hadoop). A synchronization point in a text file is a
newline. A synchronization point in an RCFile or SequenceFile is a block
header, which is recognized by a randomly selected sequence of 16(?) bytes.
Offset 0 is also a synchronization point. Some file formats cannot detect
sync points using the data and must rely on external indexes (e.g. LZO), in
which case a sync point is a reinitialization point for the compression
algorithm (which is always a block compressor), looked up in that external
index.

A task is given (start, end) as bytes offsets. It finds the first sync point
at or after 'start'. It then reads records in any block starting after
'start', but not starting after 'end'. In the case of a text file, block =
line = record. In other files, the concepts are distinct.

If a task is given a short block (x, x+1) then it will find the first sync
point after x, which will also be after x+1, so it will read no records.
Thus no records are read twice, and blocks must be large enough to give each
task at least some records between sync points in its block. If your file
format syncs every 64Mb, and your record is 10 bytes, and you give out 1Mb
splits in the hope of getting 100K records per mapper, you will get 1 in 64
mappers does 6 million records, and 63 in 64 mappers do nothing.

Thus each task processes a roughly equal number of bytes, but not an equal
number of records.

I'm afraid I can't help more, but these are the principles you are looking
for.

By the way, does anyone know how or whether SequenceFile avoids a heavy HDFS
hit on the first block of the file, where it looks up the magic byte
signature for that file? I'm too lazy to look this morning, but the thought
occurs to me.

S.

On 5 October 2011 22:35, Thomas Anderson <t.dt.aanderson@gmail.com> wrote:

> I don't use mapreduce, and just practice using Hadoop common api to
> manually split a data file, in which data is stored in a form of
> SequceFileInputFormat.
>
> The way to split file is by dividing file length by total tasks
> number. InputSplit created will be passed RecordReader and read from
> designated path. The code is as below:
>
>   private void readPartOfDataFile() {
>      taskId = getTaskId();
>      InputSplit split = getSplit(taskid);
>      SequenceFileRecordReader<Text, CustomData> input = new
> SequenceFileRecordReader<Text, CustomData>(conf, (FileSplit) split);
>      Text url = input.createKey();
>      CustomData d = input.createValue();
>      int count = 0;
>      while(input.next(url, d)) {
>        count++;
>      }
>    }
>
>    private InputSplit getSplit(final int taskid) throws IOException {
>      FileSystem fs = FileSystem.get(conf);
>      Path filePath = new Path("path/to/", "file");
>      FileStatus[] status = fs.listStatus(filePath);
>      int maxTasks = conf.getInt("test.maxtasks", 12);
>      for(FileStatus file: status) {
>        if(file.isDir()) { // get data file
>          Path dataFile = new Path(file.getPath(), "data");
>          FileStatus data = fs.getFileStatus(dataFile);
>          long dataLength = data.getLen();
>          BlockLocation[] locations =
>            fs.getFileBlockLocations(data, 0, dataLength);
>          if(0 < dataLength) {
>            long chunk = dataLength/(long)maxTasks;
>            long beg = (taskid*chunk)+(long)1;
>            long end = (taskid+1)*chunk;
>            if(maxTasks == (taskid+1)) {
>              end = dataLength;
>            }
>            return new FileSplit(dataFile, beg, end,
> locations[locations.length-1].getHosts());
>          } else {
>            LOG.info("No Data for file:"+file.getPath());
>          }
>        }// is dir
>      }// for
>      return null;
>    }
>
> However, it seems that the records read from data file is not equally
> distributed. For instance, data file may contain 1200 records and data
> length is around 74250. With 12 max tasks, each task may roughly hold
> size around 6187 (per split). But the records displayed shows that
> each task may hold various  records (e.g. task 4 read records 526.
> task 5 read 632. task 6 read 600) and the total count records is
> larger than the total records stored. I check
> JobClient.writeOldSplits(). It seems similar to the way to JobClient
> divides data. What is missing when considering split data with hadoop
> common api?
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message