Return-Path: Delivered-To: apmail-hadoop-core-user-archive@www.apache.org Received: (qmail 33880 invoked from network); 11 Dec 2008 10:06:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Dec 2008 10:06:24 -0000 Received: (qmail 71110 invoked by uid 500); 11 Dec 2008 10:06:31 -0000 Delivered-To: apmail-hadoop-core-user-archive@hadoop.apache.org Received: (qmail 71076 invoked by uid 500); 11 Dec 2008 10:06:31 -0000 Mailing-List: contact core-user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-user@hadoop.apache.org Delivered-To: mailing list core-user@hadoop.apache.org Received: (qmail 71065 invoked by uid 99); 11 Dec 2008 10:06:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Dec 2008 02:06:31 -0800 X-ASF-Spam-Status: No, hits=-0.0 required=10.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of amitsingh@cse.iitb.ac.in designates 59.162.23.249 as permitted sender) Received: from [59.162.23.249] (HELO smtp1.iitb.ac.in) (59.162.23.249) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 11 Dec 2008 10:06:14 +0000 Received: (qmail 29308 invoked from network); 11 Dec 2008 15:35:51 +0530 Received: from unknown (HELO ldns1.iitb.ac.in) (10.200.12.1) by smtp1.iitb.ac.in with SMTP; 11 Dec 2008 15:35:51 +0530 Received: (qmail 20198 invoked by uid 510); 11 Dec 2008 10:05:51 -0000 Received: from 10.200.12.201 by ldns1 (envelope-from , uid 501) with qmail-scanner-2.05 (clamdscan: 0.94.2-exp/8746. spamassassin: 3.2.5. Clear:RC:1(10.200.12.201):SA:0(-4.4/4.0):. Processed in 6.831485 secs); 11 Dec 2008 10:05:51 -0000 X-Spam-Checker-Version: SpamAssassin 3.2.5 (2008-06-10) on ldns1.iitb.ac.in X-Spam-Level: X-Envelope-From: amitsingh@cse.iitb.ac.in Received: from unknown (HELO smtp-auth.iitb.ac.in) (10.200.12.201) by ldns1.iitb.ac.in with SMTP; 11 Dec 2008 10:05:44 -0000 Received: from [10.129.112.100] (unknown [10.129.112.100]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (No client certificate requested) by smtp-auth.iitb.ac.in (Postfix) with ESMTP id 4CA9713900; Thu, 11 Dec 2008 15:35:44 +0530 (IST) Message-ID: <4940E6F3.6060506@cse.iitb.ac.in> Date: Thu, 11 Dec 2008 15:39:55 +0530 From: amitsingh User-Agent: Thunderbird 2.0.0.16 (X11/20080724) MIME-Version: 1.0 To: amitsingh CC: core-user@hadoop.apache.org Subject: Re: Re: Re: File Splits in Hadoop References: <494014A2.9050303@cse.iitb.ac.in> <4940C1AA.3040602@cse.iitb.ac.in> In-Reply-To: <4940C1AA.3040602@cse.iitb.ac.in> Content-Type: text/plain; charset=ISO-8859-1; format=flowed Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org X-Old-Spam-Status: No, score=-4.4 required=4.0 tests=ALL_TRUSTED,BAYES_00 autolearn=ham version=3.2.5 Hi , After some debugging it seems that i got the problem. In parseARC(...) length for bzip was incorrectly set. (last bzip in the split) if (totalRead > splitEnd) { break; } ... it should have ideally allowed to read from next block instead of setting length to offset-size for last block. so modified the next to directly read from inputStream instead of byte array populated using offset & length. anyways, Is this the correct way to read records spanning multiple splits. (Currently Its taking quite some time to do processing 100% CPU :(. Investigating what could be the potential cause ) So for me the conclusion seems to be reads spanning multiple physical blocks are transparent. ***I Guess, could have handled this in parseARC in a cleaner way**** ********************************************************************************************* modified srcCode ********************************************************************************************* public boolean next(Text key, BytesWritable value) throws IOException { long start = 0; long len = 0; try { LOG.info("NEXT !!!!!!!!!!!!!! "); int index = recordIndex++; if (index >= startLens.length) { LOG.info("BAD "); return false; } else { LOG.info("GOOD"); } start = startLens[index][0]; len = startLens[index][1]; byte[] zipbytes = new byte[(int) len]; LOG.info("index" + index + "\tstartLens.length" + startLens.length +"\tstart:" + start + "\tlen" + len); ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes); GZIPInputStream zin = null; in.seek(start); //Change if (index == startLens.length - 1) { //Change LOG.info("zin = new GZIPInputStream(in);"); //Change zin = new GZIPInputStream(in); //Change } else { //Change in.read(zipbytes); //Change zin = new GZIPInputStream(zipin); //Change LOG.info("zin = new GZIPInputStream(zipin);");//Change } int gzipRead = -1; int totalGzipRead = 0; baos.reset(); try { while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) { baos.write(buffer, 0, gzipRead); totalGzipRead += gzipRead; } } catch (Exception ex) { ex.printStackTrace(); LOG .info(ex.toString() + "\nBANGstart:" + start + "\tlen" + len); LOG.equals(StringUtils.stringifyException(ex)); } byte[] pageBytes = baos.toByteArray(); baos.close(); if (index != startLens.length - 1) { //Change zin.close(); //Change } //Change zipin.close(); Text keyText = (Text) key; keyText.set("" + index); BytesWritable valueBytes = (BytesWritable) value; valueBytes.set(pageBytes, 0, pageBytes.length); return true; } catch (Exception e) { e.printStackTrace(); LOG.info(e.toString() + "start:" + start + "\tlen" + len); LOG.equals(StringUtils.stringifyException(e)); return false; } } amitsingh wrote: > Thanks for discussion Taran, > > The problem still persists. > What should be done if i have a record which spans multiple PSplits > (physcial splits on HDFS)? > What happens if we try to read beyond a pSplit? > Is the next read transparently done from records corresponding to > next block for the same file (might not be on the same machine) or > next block (may not be of the same file) from the local disk is read. > > If its former i guess things should have worked fine (surprisingly > they arent !! i m goofing it up somewhere). > If its latter then i have no idea how to tackle this. (Any help would > be highly appreciated) > > > > ************************************************************************************************** > > > I Tried running a simple program where in I created a sample GZip file > by serailizing records > // serialize the objects sarah and sam > FileOutputStream fos = new > FileOutputStream("/home/amitsingh/OUTPUT/out.bin"); > GZIPOutputStream gz = new GZIPOutputStream(fos); > ObjectOutputStream oos = new ObjectOutputStream(gz); > > for (int i = 0; i < 500000; i++) { > Employee sam = new Employee(i + "name", i, i + 50000); > // 3 fields , 2 int , 1 string > oos.writeObject(sam); > } > oos.flush(); > oos.close(); > > Now if i just run a simple map reduce on this binary file, it gives > exception java.io.EOFException: Unexpected end of ZLIB input stream > It creates 2 splits > Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 > length:1555001 hosts: sandpiper ,bytesRemaining: 1555001 > Split 2: hdfs://localhost:54310/user/amitsingh/out1: start1555001 > length:1555001 hosts: sandpiper , > > For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB > input stream [for startLens[0] start:0 len1556480] > For Map2--> No valid GZip is found as startLens is empty > > I am not sure why in Map1 len1556480 and not 3110002(entire file) as > there is ONLY one GZip and thats the entire file. > Any guidance would be of great help ?? > > > > > > > > ************************************************************************************************************** > > Source code > ************************************************************************************************************** > > > package org.apache.hadoop.mapred; > import java.io.ByteArrayInputStream; > import java.io.ByteArrayOutputStream; > import java.io.IOException; > import java.util.ArrayList; > import java.util.List; > import java.util.zip.GZIPInputStream; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.FSDataInputStream; > import org.apache.hadoop.fs.FileSystem; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.util.ReflectionUtils; > import org.apache.hadoop.util.StringUtils; > > public class CustomGzipRecordReader implements > RecordReader { > > public static final Log LOG = LogFactory > .getLog(CustomGzipRecordReader.class); > > protected Configuration conf; > protected long splitStart = 0; > protected long pos = 0; > protected long splitEnd = 0; > protected long splitLen = 0; > protected long fileLen = 0; > protected FSDataInputStream in; > protected int recordIndex = 0; > protected long[][] startLens; > protected byte[] buffer = new byte[4096]; > > private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B }; > > //chech the split and populate startLens indicating at which all > offset a Zlib file starts in this split > private void parseArcBytes() throws IOException { > > long totalRead = in.getPos(); > byte[] buffer = new byte[4096]; > List starts = new ArrayList(); > > int read = -1; > while ((read = in.read(buffer)) > 0) { > > for (int i = 0; i < (read - 1); i++) { > > if ((buffer[i] == (byte) 0x1F) > && (buffer[i + 1] == (byte) 0x8B)) { > long curStart = totalRead + i; > in.seek(curStart); > byte[] zipbytes = null; > try { > zipbytes = new byte[32]; > in.read(zipbytes); > ByteArrayInputStream zipin = new > ByteArrayInputStream( > zipbytes); > GZIPInputStream zin = new GZIPInputStream(zipin); > zin.close(); > zipin.close(); > starts.add(curStart); > LOG.info("curStart: " + (curStart)); > } catch (Exception e) { > LOG.info("Ignoring position: " + (curStart)); > continue; > } > } > } > > totalRead += read; > in.seek(totalRead); > if (totalRead > splitEnd) { > break; > } > } > > startLens = new long[starts.size()][2]; > for (int i = 0; i < starts.size(); i++) { > > long start = starts.get(i); > long length = ((i < starts.size() - 1) ? starts.get(i + 1) > : totalRead) > - start; > startLens[i][0] = start; > startLens[i][1] = length; > System.out.println("startLens[" + i + "][0] " + > startLens[i][0] > + "\t\t" + startLens[i][1]); > } > } > > public CustomGzipRecordReader(Configuration conf, FileSplit split) > throws IOException { > > Path path = split.getPath(); > FileSystem fs = path.getFileSystem(conf); > fileLen = fs.getFileStatus(split.getPath()).getLen(); > this.conf = conf; > this.in = fs.open(split.getPath()); > this.splitStart = split.getStart(); > this.splitEnd = splitStart + split.getLength(); > this.splitLen = split.getLength(); > in.seek(splitStart); > parseArcBytes(); > > } > > public void close() throws IOException { > this.in.close(); > } > > public Text createKey() { > return (Text) ReflectionUtils.newInstance(Text.class, conf); > } > > public BytesWritable createValue() { > return (BytesWritable) > ReflectionUtils.newInstance(BytesWritable.class, > conf); > } > > public long getPos() throws IOException { > return 0; > } > > public float getProgress() throws IOException { > > if (recordIndex == 0) { > return 0.0f; > } else { > // the progress is current pos - where we started / length > of the > // split > return Math.min(1.0f, (float) (recordIndex / > startLens.length)); > } > } > > public boolean next(Text key, BytesWritable value) throws > IOException { > long start = 0; > long len = 0; > try { > LOG.info("NEXT Called "); > int index = recordIndex++; > if (index >= startLens.length) { > LOG.info("BAD "); > return false; > } else { > LOG.info("GOOD"); > } > > start = startLens[index][0]; > len = startLens[index][1]; > byte[] zipbytes = new byte[(int) len]; > > LOG.info("start:" + start + "\tlen" + len); > > in.seek(start); > in.read(zipbytes); > > ByteArrayOutputStream baos = new ByteArrayOutputStream(); > ByteArrayInputStream zipin = new > ByteArrayInputStream(zipbytes); > GZIPInputStream zin = new GZIPInputStream(zipin); > > int gzipRead = -1; > int totalGzipRead = 0; > baos.reset(); > try { > while ((gzipRead = zin.read(buffer, 0, buffer.length)) > != -1) { // <--------SOURCE of exception > baos.write(buffer, 0, gzipRead); > totalGzipRead += gzipRead; > } > } catch (Exception ex) { > ex.printStackTrace(); > LOG > .info(ex.toString() + "\nstart:" + start + > "\tlen" + len); > LOG.equals(StringUtils.stringifyException(ex)); > } > > byte[] pageBytes = baos.toByteArray(); > baos.close(); > zin.close(); > zipin.close(); > > // GZIPInputStream gs = new GZIPInputStream(new > ByteArrayInputStream( > // bytes.get())); > > // ObjectInputStream ois = new ObjectInputStream(zin); > // for (int i = 0; i < 500000; i++) { > // Employee sarah = null; > // try { > // sarah = (Employee) ois.readObject(); > // // LOG.info(sarah.printObject()); > // } catch (ClassNotFoundException e) { > // // TODO Auto-generated catch block > // LOG.info(e + "start:" + start + "\tlen" + len); > // e.printStackTrace(); > // } > // // sarah.print(); > // > // } > Text keyText = (Text) key; > keyText.set(""+index); > BytesWritable valueBytes = (BytesWritable) value; > valueBytes.set(pageBytes, 0, pageBytes.length); > > return true; > } catch (Exception e) { > e.printStackTrace(); > LOG.info(e.toString() + "start:" + start + "\tlen" + len); > LOG.equals(StringUtils.stringifyException(e)); > return false; > } > > } > } > > *********************************************************************************** > > > package org.apache.hadoop.mapred; > import java.io.IOException; > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.Text; > /** > * A input format the reads custom gzip files. > */ > public class CustomGzipInputFormat extends FileInputFormat BytesWritable> { > public RecordReader getRecordReader(InputSplit > split, > JobConf job, Reporter reporter) throws IOException { > reporter.setStatus(split.toString()); > return new CustomGzipRecordReader(job, (FileSplit) split); > } > > } > > *********************************************************************************** > > > > package org.apache.hadoop.examples; > > import java.io.ByteArrayInputStream; > import java.io.IOException; > import java.io.ObjectInputStream; > import java.util.ArrayList; > import java.util.List; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.conf.Configured; > import org.apache.hadoop.examples.WordCount.Reduce; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.io.BytesWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapred.CustomGzipInputFormat; > import org.apache.hadoop.mapred.Employee; > import org.apache.hadoop.mapred.FileInputFormat; > import org.apache.hadoop.mapred.FileOutputFormat; > import org.apache.hadoop.mapred.JobClient; > import org.apache.hadoop.mapred.JobConf; > import org.apache.hadoop.mapred.Mapper; > import org.apache.hadoop.mapred.OutputCollector; > import org.apache.hadoop.mapred.Reporter; > import org.apache.hadoop.mapred.TextOutputFormat; > import org.apache.hadoop.util.Tool; > import org.apache.hadoop.util.ToolRunner; > > public class CustomObjectReader extends Configured implements Tool, > Mapper { > > private JobConf jobConf; > > // public static final Log LOG = > LogFactory.getLog(ArcSegmentCreator.class); > > public CustomObjectReader() { > > } > > public void configure(JobConf job) { > this.jobConf = job; > } > > public void close() { > } > > public CustomObjectReader(Configuration conf) { > setConf(conf); > } > > public void map(Text key, BytesWritable bytes, > OutputCollector output, Reporter reporter) > throws IOException { > ObjectInputStream ois = new ObjectInputStream(new > ByteArrayInputStream( > bytes.get())); > long count =0; > for (int i = 0; i < 500000; i++) { > Employee sarah = null; > try { > sarah = (Employee) ois.readObject(); > } catch (ClassNotFoundException e) { > System.out.println("EXCEPTOPN" + e); > e.printStackTrace(); > } > sarah.print(); > } > > } > > public void readArcs(Path arcFiles, Path outDir) throws IOException { > > } > > public int run(String[] args) throws Exception { > > String usage = "Usage: ArcReader "; > > if (args.length < 2) { > System.err.println(usage); > return -1; > } > > > > JobConf job = new JobConf(getConf(), CustomObjectReader.class); > job.setJobName("custom reader"); > > job.setInputFormat(CustomGzipInputFormat.class); > job.setMapperClass(CustomObjectReader.class); > job.setMapOutputKeyClass(Text.class); > job.setMapOutputValueClass(Text.class); > > job.setOutputFormat(TextOutputFormat.class); > job.setOutputKeyClass(Text.class); > job.setOutputValueClass(Text.class); > > job.setReducerClass(Reduce.class); > > List other_args = new ArrayList(); > for (int i = 0; i < args.length; ++i) { > try { > if ("-m".equals(args[i])) { > job.setNumMapTasks(Integer.parseInt(args[++i])); > } else if ("-r".equals(args[i])) { > job.setNumReduceTasks(Integer.parseInt(args[++i])); > } else { > other_args.add(args[i]); > } > } catch (NumberFormatException except) { > System.out.println("ERROR: Integer expected instead of " > + args[i]); > return printUsage(); > } catch (ArrayIndexOutOfBoundsException except) { > System.out.println("ERROR: Required parameter missing > from " > + args[i - 1]); > return printUsage(); > } > } > // Make sure there are exactly 2 parameters left. > if (other_args.size() != 2) { > System.out.println("ERROR: Wrong number of parameters: " > + other_args.size() + " instead of 2."); > return printUsage(); > } > FileInputFormat.setInputPaths(job, other_args.get(0)); > FileOutputFormat.setOutputPath(job, new Path(other_args.get(1))); > > JobClient.runJob(job); > return 0; > > } > > static int printUsage() { > System.out > .println("wordcount [-m ] [-r ] > "); > ToolRunner.printGenericCommandUsage(System.out); > return -1; > } > > public static void main(String[] args) throws Exception { > int res = ToolRunner.run(new Configuration(), new > CustomObjectReader(), > args); > System.exit(res); > } > > } > > > *********************************************************************************** > > package org.apache.hadoop.mapred; > > import java.io.FileInputStream; > import java.io.FileOutputStream; > import java.io.ObjectInputStream; > import java.io.ObjectOutputStream; > import java.io.Serializable; > import java.util.zip.GZIPInputStream; > import java.util.zip.GZIPOutputStream; > > public class Employee implements Serializable { > String name; > int age; > int salary; > > public Employee(String name, int age, int salary) { > this.name = name; > this.age = age; > this.salary = salary; > } > > public void print() { > System.out.println("Record for: " + name); > System.out.println("Name: " + name); > System.out.println("Age: " + age); > System.out.println("Salary: " + salary); > } > public String printObject() { > return "name" +name +"\tage" + age +"\tSalary: " + salary; > } > > public static void main(String argv[]) throws Exception { > // create some objects > org.apache.hadoop.mapred.Employee sarah = new Employee("S. > Jordan", 28, 56000); > > > // serialize the objects sarah and sam > FileOutputStream fos = new FileOutputStream( > "/home/amitsingh/OUTPUT/out.bin"); > GZIPOutputStream gz = new GZIPOutputStream(fos); > ObjectOutputStream oos = new ObjectOutputStream(gz); > > for (int i = 0; i < 500000; i++) { > org.apache.hadoop.mapred.Employee sam = new Employee(i > + "MyNameIsGreat", i, > i + 50000); > oos.writeObject(sam); > } > oos.flush(); > oos.close(); > //fos.close(); > > // deserialize objects sarah and sam > FileInputStream fis = new > FileInputStream("/home/amitsingh/OUTPUT/out.bin"); > GZIPInputStream gs = new GZIPInputStream(fis); > ObjectInputStream ois = new ObjectInputStream(gs); > for (int i = 0; i < 10; i++) { > sarah = (org.apache.hadoop.mapred.Employee) > ois.readObject(); > sarah.print(); > > } > ois.close(); > //fis.close(); > } > > } >