Return-Path: X-Original-To: apmail-hadoop-common-user-archive@www.apache.org Delivered-To: apmail-hadoop-common-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 377937C1F for ; Fri, 14 Oct 2011 17:36:54 +0000 (UTC) Received: (qmail 16731 invoked by uid 500); 14 Oct 2011 17:36:51 -0000 Delivered-To: apmail-hadoop-common-user-archive@hadoop.apache.org Received: (qmail 16688 invoked by uid 500); 14 Oct 2011 17:36:51 -0000 Mailing-List: contact common-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-user@hadoop.apache.org Delivered-To: mailing list common-user@hadoop.apache.org Received: (qmail 16680 invoked by uid 99); 14 Oct 2011 17:36:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2011 17:36:51 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of shevek@karmasphere.com designates 209.85.212.48 as permitted sender) Received: from [209.85.212.48] (HELO mail-vw0-f48.google.com) (209.85.212.48) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2011 17:36:44 +0000 Received: by vws7 with SMTP id 7so1793886vws.35 for ; Fri, 14 Oct 2011 10:36:24 -0700 (PDT) MIME-Version: 1.0 Received: by 10.52.33.69 with SMTP id p5mr9743765vdi.78.1318613783876; Fri, 14 Oct 2011 10:36:23 -0700 (PDT) Received: by 10.52.168.100 with HTTP; Fri, 14 Oct 2011 10:36:22 -0700 (PDT) In-Reply-To: References: Date: Fri, 14 Oct 2011 10:36:22 -0700 Message-ID: Subject: Re: How to evenly split data file From: Shevek To: common-user@hadoop.apache.org Content-Type: multipart/alternative; boundary=20cf3079ba542f144b04af45b2af --20cf3079ba542f144b04af45b2af Content-Type: text/plain; charset=ISO-8859-1 I can't answer your question fully without sitting in front of it using a debugger, but the principle is this: Hadoop's JobClient splits the data into approximately evenly sized blocks in bytes. The trick now is to synchronize those blocks on record boundaries. All file formats behave approximately the same way, and as I understand it, the rough algorithm is this: Each file must have "synchronization points" (a term I now define, not common terminology in Hadoop). A synchronization point in a text file is a newline. A synchronization point in an RCFile or SequenceFile is a block header, which is recognized by a randomly selected sequence of 16(?) bytes. Offset 0 is also a synchronization point. Some file formats cannot detect sync points using the data and must rely on external indexes (e.g. LZO), in which case a sync point is a reinitialization point for the compression algorithm (which is always a block compressor), looked up in that external index. A task is given (start, end) as bytes offsets. It finds the first sync point at or after 'start'. It then reads records in any block starting after 'start', but not starting after 'end'. In the case of a text file, block = line = record. In other files, the concepts are distinct. If a task is given a short block (x, x+1) then it will find the first sync point after x, which will also be after x+1, so it will read no records. Thus no records are read twice, and blocks must be large enough to give each task at least some records between sync points in its block. If your file format syncs every 64Mb, and your record is 10 bytes, and you give out 1Mb splits in the hope of getting 100K records per mapper, you will get 1 in 64 mappers does 6 million records, and 63 in 64 mappers do nothing. Thus each task processes a roughly equal number of bytes, but not an equal number of records. I'm afraid I can't help more, but these are the principles you are looking for. By the way, does anyone know how or whether SequenceFile avoids a heavy HDFS hit on the first block of the file, where it looks up the magic byte signature for that file? I'm too lazy to look this morning, but the thought occurs to me. S. On 5 October 2011 22:35, Thomas Anderson wrote: > I don't use mapreduce, and just practice using Hadoop common api to > manually split a data file, in which data is stored in a form of > SequceFileInputFormat. > > The way to split file is by dividing file length by total tasks > number. InputSplit created will be passed RecordReader and read from > designated path. The code is as below: > > private void readPartOfDataFile() { > taskId = getTaskId(); > InputSplit split = getSplit(taskid); > SequenceFileRecordReader input = new > SequenceFileRecordReader(conf, (FileSplit) split); > Text url = input.createKey(); > CustomData d = input.createValue(); > int count = 0; > while(input.next(url, d)) { > count++; > } > } > > private InputSplit getSplit(final int taskid) throws IOException { > FileSystem fs = FileSystem.get(conf); > Path filePath = new Path("path/to/", "file"); > FileStatus[] status = fs.listStatus(filePath); > int maxTasks = conf.getInt("test.maxtasks", 12); > for(FileStatus file: status) { > if(file.isDir()) { // get data file > Path dataFile = new Path(file.getPath(), "data"); > FileStatus data = fs.getFileStatus(dataFile); > long dataLength = data.getLen(); > BlockLocation[] locations = > fs.getFileBlockLocations(data, 0, dataLength); > if(0 < dataLength) { > long chunk = dataLength/(long)maxTasks; > long beg = (taskid*chunk)+(long)1; > long end = (taskid+1)*chunk; > if(maxTasks == (taskid+1)) { > end = dataLength; > } > return new FileSplit(dataFile, beg, end, > locations[locations.length-1].getHosts()); > } else { > LOG.info("No Data for file:"+file.getPath()); > } > }// is dir > }// for > return null; > } > > However, it seems that the records read from data file is not equally > distributed. For instance, data file may contain 1200 records and data > length is around 74250. With 12 max tasks, each task may roughly hold > size around 6187 (per split). But the records displayed shows that > each task may hold various records (e.g. task 4 read records 526. > task 5 read 632. task 6 read 600) and the total count records is > larger than the total records stored. I check > JobClient.writeOldSplits(). It seems similar to the way to JobClient > divides data. What is missing when considering split data with hadoop > common api? > --20cf3079ba542f144b04af45b2af--