hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amitsingh <amitsi...@cse.iitb.ac.in>
Subject Re: Re: Re: File Splits in Hadoop
Date Thu, 11 Dec 2008 10:09:55 GMT
Hi ,

After some debugging it seems that i got the problem.

In parseARC(...)  length for bzip was incorrectly set. (last bzip in the 
split)
if (totalRead > splitEnd) {
               break;
} ... it should have ideally allowed to read from next block instead of 
setting length to offset-size for last block.

so modified the next to directly read from inputStream instead of byte 
array  populated using offset & length.


anyways, Is this the correct way to read records spanning multiple splits.
(Currently Its taking quite some time to do processing 100% CPU :(.  
Investigating what could be the potential cause )

So for me the conclusion seems to be reads spanning multiple physical 
blocks are transparent.

***I Guess, could have handled this in parseARC in a cleaner way****

 


*********************************************************************************************
modified srcCode
*********************************************************************************************

public boolean next(Text key, BytesWritable value) throws IOException {
        long start = 0;
        long len = 0;
        try {
            LOG.info("NEXT !!!!!!!!!!!!!! ");
            int index = recordIndex++;
            if (index >= startLens.length) {
                LOG.info("BAD ");
                return false;
            } else {
                LOG.info("GOOD");
            }

            start = startLens[index][0];
            len = startLens[index][1];
            byte[] zipbytes = new byte[(int) len];

            LOG.info("index" + index + "\tstartLens.length" + 
startLens.length +"\tstart:" + start + "\tlen" + len);

           
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes);
            GZIPInputStream zin = null;
            in.seek(start);                                            
                //Change
            if (index == startLens.length - 1) {                        
//Change
                LOG.info("zin = new GZIPInputStream(in);");    //Change
                zin = new GZIPInputStream(in);                          
//Change
            } else {                                                    
                //Change
                in.read(zipbytes);                                    
                //Change
                zin = new GZIPInputStream(zipin);                    
//Change
                LOG.info("zin = new GZIPInputStream(zipin);");//Change
            }

            int gzipRead = -1;
            int totalGzipRead = 0;
            baos.reset();
            try {
                while ((gzipRead = zin.read(buffer, 0, buffer.length)) 
!= -1) {
                    baos.write(buffer, 0, gzipRead);
                    totalGzipRead += gzipRead;
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                LOG
                        .info(ex.toString() + "\nBANGstart:" + start + 
"\tlen"
                                + len);
                LOG.equals(StringUtils.stringifyException(ex));
            }

            byte[] pageBytes = baos.toByteArray();
            baos.close();
            if (index != startLens.length - 1) {                    //Change
                zin.close();                                            
           //Change
            }                                                            
               //Change
            zipin.close();
          
            Text keyText = (Text) key;
            keyText.set("" + index);
            BytesWritable valueBytes = (BytesWritable) value;
            valueBytes.set(pageBytes, 0, pageBytes.length);

            return true;
        } catch (Exception e) {
            e.printStackTrace();
            LOG.info(e.toString() + "start:" + start + "\tlen" + len);
            LOG.equals(StringUtils.stringifyException(e));
            return false;
        }

    }


amitsingh wrote:
> Thanks for discussion Taran,
>
> The problem still persists.
> What should be done if i have a record which spans multiple PSplits 
> (physcial splits on HDFS)?
> What happens if  we try to read beyond a pSplit?
> Is the next read transparently done from records corresponding to 
> next  block for the same file (might not be on the same machine) or
> next block (may not be of the same file) from the local disk is read.
>
> If its former i guess things should have worked fine (surprisingly 
> they arent !! i m goofing  it up somewhere).
> If its latter then i have no idea how to tackle this. (Any help would 
> be highly appreciated)
>
>
>
> **************************************************************************************************

>
>
> I Tried running a simple program where in I created a sample GZip file 
> by serailizing records
>           // serialize the objects sarah and sam
>           FileOutputStream fos = new 
> FileOutputStream("/home/amitsingh/OUTPUT/out.bin");
>           GZIPOutputStream gz = new GZIPOutputStream(fos);
>           ObjectOutputStream oos = new ObjectOutputStream(gz);
>
>           for (int i = 0; i < 500000; i++) {
>               Employee sam = new Employee(i + "name", i,   i + 50000);
>            // 3 fields , 2 int , 1 string
>               oos.writeObject(sam);
>           }
>           oos.flush();
>           oos.close();
>
> Now if i just run a simple map reduce on this binary file, it gives 
> exception java.io.EOFException: Unexpected end of ZLIB input stream
> It creates 2 splits
> Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 
> length:1555001 hosts: sandpiper ,bytesRemaining: 1555001
> Split 2:  hdfs://localhost:54310/user/amitsingh/out1: start1555001 
> length:1555001 hosts: sandpiper ,
>
> For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB 
> input stream [for startLens[0]  start:0    len1556480]
> For Map2--> No valid GZip is found as startLens is empty
>
> I am not sure why in Map1 len1556480 and not 3110002(entire file) as  
> there is ONLY one GZip and thats the entire file.
> Any guidance would be of great help ??
>
>
>
>
>
>
>
> **************************************************************************************************************

>
> Source code
> **************************************************************************************************************

>
>
> package org.apache.hadoop.mapred;
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.zip.GZIPInputStream;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FSDataInputStream;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.util.ReflectionUtils;
> import org.apache.hadoop.util.StringUtils;
>
> public class CustomGzipRecordReader implements
>        RecordReader<Text, BytesWritable> {
>
>    public static final Log LOG = LogFactory
>            .getLog(CustomGzipRecordReader.class);
>
>    protected Configuration conf;
>    protected long splitStart = 0;
>    protected long pos = 0;
>    protected long splitEnd = 0;
>    protected long splitLen = 0;
>    protected long fileLen = 0;
>    protected FSDataInputStream in;
>    protected int recordIndex = 0;
>    protected long[][] startLens;
>    protected byte[] buffer = new byte[4096];
>
>    private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };
>
>    //chech the split and populate startLens indicating at which all 
> offset a Zlib file starts in this split
>    private void parseArcBytes() throws IOException {
>
>        long totalRead = in.getPos();
>        byte[] buffer = new byte[4096];
>        List<Long> starts = new ArrayList<Long>();
>
>        int read = -1;
>        while ((read = in.read(buffer)) > 0) {
>
>            for (int i = 0; i < (read - 1); i++) {
>
>                if ((buffer[i] == (byte) 0x1F)
>                        && (buffer[i + 1] == (byte) 0x8B)) {
>                    long curStart = totalRead + i;
>                    in.seek(curStart);
>                    byte[] zipbytes = null;
>                    try {
>                        zipbytes = new byte[32];
>                        in.read(zipbytes);
>                        ByteArrayInputStream zipin = new 
> ByteArrayInputStream(
>                                zipbytes);
>                        GZIPInputStream zin = new GZIPInputStream(zipin);
>                        zin.close();
>                        zipin.close();
>                        starts.add(curStart);
>                        LOG.info("curStart: " + (curStart));
>                    } catch (Exception e) {
>                        LOG.info("Ignoring position: " + (curStart));
>                        continue;
>                    }
>                }
>            }
>
>            totalRead += read;
>            in.seek(totalRead);
>            if (totalRead > splitEnd) {
>                break;
>            }
>        }
>
>        startLens = new long[starts.size()][2];
>        for (int i = 0; i < starts.size(); i++) {
>
>            long start = starts.get(i);
>            long length = ((i < starts.size() - 1) ? starts.get(i + 1)
>                    : totalRead)
>                    - start;
>            startLens[i][0] = start;
>            startLens[i][1] = length;
>            System.out.println("startLens[" + i + "][0] " + 
> startLens[i][0]
>                    + "\t\t" + startLens[i][1]);
>        }
>    }
>
>    public CustomGzipRecordReader(Configuration conf, FileSplit split)
>            throws IOException {
>
>        Path path = split.getPath();
>        FileSystem fs = path.getFileSystem(conf);
>        fileLen = fs.getFileStatus(split.getPath()).getLen();
>        this.conf = conf;
>        this.in = fs.open(split.getPath());
>        this.splitStart = split.getStart();
>        this.splitEnd = splitStart + split.getLength();
>        this.splitLen = split.getLength();
>        in.seek(splitStart);
>        parseArcBytes();
>
>    }
>
>    public void close() throws IOException {
>        this.in.close();
>    }
>
>    public Text createKey() {
>        return (Text) ReflectionUtils.newInstance(Text.class, conf);
>    }
>
>    public BytesWritable createValue() {
>        return (BytesWritable) 
> ReflectionUtils.newInstance(BytesWritable.class,
>                conf);
>    }
>
>    public long getPos() throws IOException {
>        return 0;
>    }
>
>    public float getProgress() throws IOException {
>
>        if (recordIndex == 0) {
>            return 0.0f;
>        } else {
>            // the progress is current pos - where we started / length 
> of the
>            // split
>            return Math.min(1.0f, (float) (recordIndex / 
> startLens.length));
>        }
>    }
>
>    public boolean next(Text key, BytesWritable value) throws 
> IOException {
>        long start = 0;
>        long len = 0;
>        try {
>            LOG.info("NEXT Called ");
>            int index = recordIndex++;
>            if (index >= startLens.length) {
>                LOG.info("BAD ");
>                return false;
>            } else {
>                LOG.info("GOOD");
>            }
>
>            start = startLens[index][0];
>            len = startLens[index][1];
>            byte[] zipbytes = new byte[(int) len];
>
>            LOG.info("start:" + start + "\tlen" + len);
>
>            in.seek(start);
>            in.read(zipbytes);
>
>            ByteArrayOutputStream baos = new ByteArrayOutputStream();
>            ByteArrayInputStream zipin = new 
> ByteArrayInputStream(zipbytes);
>            GZIPInputStream zin = new GZIPInputStream(zipin);
>
>            int gzipRead = -1;
>            int totalGzipRead = 0;
>            baos.reset();
>            try {
>                while ((gzipRead = zin.read(buffer, 0, buffer.length)) 
> != -1) {  // <--------SOURCE of exception
>                    baos.write(buffer, 0, gzipRead);
>                    totalGzipRead += gzipRead;
>                }
>            } catch (Exception ex) {
>                ex.printStackTrace();
>                LOG
>                        .info(ex.toString() + "\nstart:" + start + 
> "\tlen"                                       + len);
>                LOG.equals(StringUtils.stringifyException(ex));
>            }
>
>            byte[] pageBytes = baos.toByteArray();
>            baos.close();
>            zin.close();
>            zipin.close();
>
> //            GZIPInputStream gs = new GZIPInputStream(new 
> ByteArrayInputStream(
> //                    bytes.get()));
>
>            // ObjectInputStream ois = new ObjectInputStream(zin);
>            // for (int i = 0; i < 500000; i++) {
>            // Employee sarah = null;
>            // try {
>            // sarah = (Employee) ois.readObject();
>            // // LOG.info(sarah.printObject());
>            // } catch (ClassNotFoundException e) {
>            // // TODO Auto-generated catch block
>            // LOG.info(e + "start:" + start + "\tlen" + len);
>            // e.printStackTrace();
>            // }
>            // // sarah.print();
>            //
>            // }
>                       Text keyText = (Text) key;
>             keyText.set(""+index);
>             BytesWritable valueBytes = (BytesWritable) value;
>             valueBytes.set(pageBytes, 0, pageBytes.length);
>
>            return true;
>        } catch (Exception e) {
>            e.printStackTrace();
>            LOG.info(e.toString() + "start:" + start + "\tlen" + len);
>            LOG.equals(StringUtils.stringifyException(e));
>            return false;
>        }
>
>    }
> }
>
> *********************************************************************************** 
>
>
> package org.apache.hadoop.mapred;
> import java.io.IOException;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
> /**
> * A input format the reads custom gzip files.
> */
> public class CustomGzipInputFormat extends FileInputFormat<Text, 
> BytesWritable> {
>    public RecordReader<Text, BytesWritable> getRecordReader(InputSplit 
> split,
>            JobConf job, Reporter reporter) throws IOException {
>        reporter.setStatus(split.toString());
>        return new CustomGzipRecordReader(job, (FileSplit) split);
>    }
>
> }
>
> *********************************************************************************** 
>
>
>
> package org.apache.hadoop.examples;
>
> import java.io.ByteArrayInputStream;
> import java.io.IOException;
> import java.io.ObjectInputStream;
> import java.util.ArrayList;
> import java.util.List;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.examples.WordCount.Reduce;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.CustomGzipInputFormat;
> import org.apache.hadoop.mapred.Employee;
> import org.apache.hadoop.mapred.FileInputFormat;
> import org.apache.hadoop.mapred.FileOutputFormat;
> import org.apache.hadoop.mapred.JobClient;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.Mapper;
> import org.apache.hadoop.mapred.OutputCollector;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.mapred.TextOutputFormat;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> public class CustomObjectReader extends Configured implements Tool,
>        Mapper<Text, BytesWritable, Text, Text> {
>
>    private JobConf jobConf;
>
>    // public static final Log LOG = 
> LogFactory.getLog(ArcSegmentCreator.class);
>
>    public CustomObjectReader() {
>
>    }
>
>    public void configure(JobConf job) {
>        this.jobConf = job;
>    }
>
>    public void close() {
>    }
>
>    public CustomObjectReader(Configuration conf) {
>        setConf(conf);
>    }
>
>    public void map(Text key, BytesWritable bytes,
>            OutputCollector<Text, Text> output, Reporter reporter)
>            throws IOException {
>        ObjectInputStream ois = new ObjectInputStream(new 
> ByteArrayInputStream(
>                bytes.get()));
>        long count =0;
>        for (int i = 0; i < 500000; i++) {
>            Employee sarah = null;
>                      try {
>                sarah = (Employee) ois.readObject();               
>            } catch (ClassNotFoundException e) {
>                System.out.println("EXCEPTOPN" + e);
>                e.printStackTrace();
>            }
>           sarah.print();
>        }
>
>    }
>
>    public void readArcs(Path arcFiles, Path outDir) throws IOException {
>
>    }
>
>    public int run(String[] args) throws Exception {
>
>        String usage = "Usage: ArcReader <arcFiles>";
>
>        if (args.length < 2) {
>            System.err.println(usage);
>            return -1;
>        }
>
>
>
>        JobConf job = new JobConf(getConf(), CustomObjectReader.class);
>        job.setJobName("custom reader");
>
>        job.setInputFormat(CustomGzipInputFormat.class);
>        job.setMapperClass(CustomObjectReader.class);
>        job.setMapOutputKeyClass(Text.class);
>        job.setMapOutputValueClass(Text.class);
>
>        job.setOutputFormat(TextOutputFormat.class);
>        job.setOutputKeyClass(Text.class);
>        job.setOutputValueClass(Text.class);
>
>        job.setReducerClass(Reduce.class);
>
>        List<String> other_args = new ArrayList<String>();
>        for (int i = 0; i < args.length; ++i) {
>            try {
>                if ("-m".equals(args[i])) {
>                    job.setNumMapTasks(Integer.parseInt(args[++i]));
>                } else if ("-r".equals(args[i])) {
>                    job.setNumReduceTasks(Integer.parseInt(args[++i]));
>                } else {
>                    other_args.add(args[i]);
>                }
>            } catch (NumberFormatException except) {
>                System.out.println("ERROR: Integer expected instead of "
>                        + args[i]);
>                return printUsage();
>            } catch (ArrayIndexOutOfBoundsException except) {
>                System.out.println("ERROR: Required parameter missing 
> from "
>                        + args[i - 1]);
>                return printUsage();
>            }
>        }
>        // Make sure there are exactly 2 parameters left.
>        if (other_args.size() != 2) {
>            System.out.println("ERROR: Wrong number of parameters: "
>                    + other_args.size() + " instead of 2.");
>            return printUsage();
>        }
>        FileInputFormat.setInputPaths(job, other_args.get(0));
>        FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
>
>        JobClient.runJob(job);
>        return 0;
>
>    }
>
>    static int printUsage() {
>        System.out
>                .println("wordcount [-m <maps>] [-r <reduces>] <input>

> <output>");
>        ToolRunner.printGenericCommandUsage(System.out);
>        return -1;
>    }
>
>    public static void main(String[] args) throws Exception {
>        int res = ToolRunner.run(new Configuration(), new 
> CustomObjectReader(),
>                args);
>        System.exit(res);
>    }
>
> }
>
>
> *********************************************************************************** 
>
> package org.apache.hadoop.mapred;
>
> import java.io.FileInputStream;
> import java.io.FileOutputStream;
> import java.io.ObjectInputStream;
> import java.io.ObjectOutputStream;
> import java.io.Serializable;
> import java.util.zip.GZIPInputStream;
> import java.util.zip.GZIPOutputStream;
>
> public class Employee implements Serializable {
>    String name;
>    int age;
>    int salary;
>
>    public Employee(String name, int age, int salary) {
>        this.name = name;
>        this.age = age;
>        this.salary = salary;
>    }
>
>    public void print() {
>        System.out.println("Record for: " + name);
>        System.out.println("Name: " + name);
>        System.out.println("Age: " + age);
>        System.out.println("Salary: " + salary);
>    }
>      public String printObject() {
>        return "name" +name +"\tage" + age +"\tSalary: " + salary;
>    }
>
>    public static void main(String argv[]) throws Exception {
>        // create some objects
>        org.apache.hadoop.mapred.Employee sarah = new Employee("S. 
> Jordan", 28, 56000);
>
>
>            // serialize the objects sarah and sam
>            FileOutputStream fos = new FileOutputStream(
>                    "/home/amitsingh/OUTPUT/out.bin");
>            GZIPOutputStream gz = new GZIPOutputStream(fos);
>            ObjectOutputStream oos = new ObjectOutputStream(gz);
>
>            for (int i = 0; i < 500000; i++) {
>                org.apache.hadoop.mapred.Employee sam = new Employee(i 
> + "MyNameIsGreat", i,
>                        i + 50000);
>                oos.writeObject(sam);
>            }
>            oos.flush();
>            oos.close();
>            //fos.close();
>
>            // deserialize objects sarah and sam
>            FileInputStream fis = new 
> FileInputStream("/home/amitsingh/OUTPUT/out.bin");
>            GZIPInputStream gs = new GZIPInputStream(fis);
>            ObjectInputStream ois = new ObjectInputStream(gs);
>            for (int i = 0; i < 10; i++) {
>                sarah = (org.apache.hadoop.mapred.Employee) 
> ois.readObject();
>                sarah.print();
>
>            }
>            ois.close();
>            //fis.close();
>        }
>  
> }
>


Mime
View raw message