hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: New to hadoop, trying to write a customary file split
Date Mon, 11 Jul 2011 19:11:33 GMT
Look at this sample
=============================================
package org.systemsbiology.hadoop;



import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;

import java.io.*;
import java.util.*;

/**
 * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
 * Splitter that reads scan tags from an XML file
 * No assumption is made about lines but tage and end tags MUST look like
<MyTag </MyTag> with no embedded spaces
 * usually you will subclass and hard code the tag you want to split on
 */
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
    public static final XMLTagInputFormat[] EMPTY_ARRAY = {};


    private static final double SPLIT_SLOP = 1.1;   // 10% slop


    public static final int BUFFER_SIZE = 4096;

    private final String m_BaseTag;
    private final String m_StartTag;
    private final String m_EndTag;
    private String m_Extension;

    public XMLTagInputFormat(final String pBaseTag) {
        m_BaseTag = pBaseTag;
        m_StartTag = "<" + pBaseTag;
        m_EndTag = "</" + pBaseTag + ">";

    }

    public String getExtension() {
        return m_Extension;
    }

    public void setExtension(final String pExtension) {
        m_Extension = pExtension;
    }

    public boolean isSplitReadable(InputSplit split) {
        if (!(split instanceof FileSplit))
            return true;
        FileSplit fsplit = (FileSplit) split;
        Path path1 = fsplit.getPath();
        return isPathAcceptable(path1);
    }

    protected boolean isPathAcceptable(final Path pPath1) {
        String path = pPath1.toString().toLowerCase();
        if(path.startsWith("part-r-"))
            return true;
        String extension = getExtension();
        if (extension != null && path.endsWith(extension.toLowerCase()))
            return true;
        if (extension != null && path.endsWith(extension.toLowerCase() +
".gz"))
            return true;
        if (extension == null )
            return true;
        return false;
    }

    public String getStartTag() {
        return m_StartTag;
    }

    public String getBaseTag() {
        return m_BaseTag;
    }

    public String getEndTag() {
        return m_EndTag;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                                                       TaskAttemptContext
context) {
        if (isSplitReadable(split))
            return new MyXMLFileReader();
        else
            return NullRecordReader.INSTANCE; // do not read
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        String fname = file.getName().toLowerCase();
        if(fname.endsWith(".gz"))
            return false;
        return true;
    }

    /**
     * Generate the list of files and make them into FileSplits.
     * This needs to be copied to insert a filter on acceptable data
     */
    @Override
    public List<InputSplit> getSplits(JobContext job
    ) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        for (FileStatus file : listStatus(job)) {
            Path path = file.getPath();
            if (!isPathAcceptable(path))   // filter acceptable data
                continue;
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            long length = file.getLen();
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
            if ((length != 0) && isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                long splitSize = computeSplitSize(blockSize, minSize,
maxSize);

                long bytesRemaining = length;
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
                    splits.add(new FileSplit(path, length - bytesRemaining,
splitSize,
                            blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
                            blkLocations[blkLocations.length -
1].getHosts()));
                }
            }
            else if (length != 0) {
                splits.add(new FileSplit(path, 0, length,
blkLocations[0].getHosts()));
            }
            else {
                //Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));
            }
        }
    //    LOG.debug("Total # of splits: " + splits.size());
        return splits;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single m_Value with the name as a m_Key
     * Value is the entire file
     * Key is the file name
     */
    public class MyXMLFileReader extends RecordReader<Text, Text> {

        private CompressionCodecFactory compressionCodecs = null;
        private long m_Start;
        private long m_End;
        private long m_Current;
        private BufferedReader m_Input;
        private Text m_Key;
        private Text m_Value = null;
        private char[] m_Buffer = new char[BUFFER_SIZE];
        StringBuilder m_Sb = new StringBuilder();

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws
IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            m_Sb.setLength(0);
            m_Start = split.getStart();
            m_End = m_Start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the m_Start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            if (codec != null) {
                CompressionInputStream inputStream =
codec.createInputStream(fileIn);
                m_Input = new BufferedReader(new
InputStreamReader(inputStream));
                m_End = Long.MAX_VALUE;
            }
            else {
                m_Input = new BufferedReader(new InputStreamReader(fileIn));
            }
            m_Current = m_Start;
            if (m_Key == null) {
                m_Key = new Text();
            }
            m_Key.set(split.getPath().getName());
            if (m_Value == null) {
                m_Value = new Text();
            }

        }

        /**
         * look for a <scan tag then read until it closes
         *
         * @return true if there is data
         * @throws java.io.IOException
         */
        public boolean nextKeyValue() throws IOException {
            if(readFromCurrentBuffer())
                return true;
            int newSize = 0;
            String startTag = getStartTag() + " ";
            String startTag2 = getStartTag() + ">";
            newSize = m_Input.read(m_Buffer);

            while (newSize > 0) {
                m_Current += newSize;
                m_Sb.append(m_Buffer, 0, newSize);
                if( readFromCurrentBuffer())
                    return true;
                newSize = m_Input.read(m_Buffer);
            }
            // exit because we are at the m_End
            if (newSize <= 0) {
                m_Key = null;
                m_Value = null;
                return false;
            }

            return true;
        }

        protected boolean readFromCurrentBuffer()
        {
            String endTag = getEndTag();
              String startText = m_Sb.toString();
            if(!startText.contains(endTag))
                return false; // need more read
            String startTag = getStartTag() + " ";
             String startTag2 = getStartTag() + ">";
            int index = startText.indexOf(startTag);
            if (index == -1)
                index = startText.indexOf(startTag2);
            if(index == -1)
                return false;
            startText = startText.substring(index);
            m_Sb.setLength(0);
            m_Sb.append(startText);

            String s = startText;
            index = s.indexOf(endTag);
            if (index == -1)
                return false; // need more read
               // throw new IllegalStateException("unmatched tag " +
getBaseTag());
            index += endTag.length();
            String tag = s.substring(0, index).trim();
            m_Value.set(tag);

            // keep the remaining text to add to the next tag
            m_Sb.setLength(0);
            String rest = s.substring(index);
            m_Sb.append(rest);
            return true;
        }

        @Override
        public Text getCurrentKey() {
            return m_Key;
        }

        @Override
        public Text getCurrentValue() {
            return m_Value;
        }

        /**
         * Get the progress within the split
         */
        public float getProgress() {
            return ((float) m_Current - m_Start) / (m_Start - m_End);
        }

        public synchronized void close() throws IOException {
            if (m_Input != null) {
                m_Input.close();
            }
        }
    }
}

=============================================

On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shiken@gmail.com> wrote:

> Hello everyone,
>
> I'm new to Hadoop and I'm trying to figure out how to design a M/R program
> to parse a file and generate a PMML file as output.
>
> What I would like to do is split a file by a keyword instead a given number
> of lines because the location of the split could change from time to time.
>
> I'm looking around and was thinking maybe KeyValueTextInputFormat would be
> the way to go but I'm not finding any clear examples how to use it. So I'm
> not sure if this is the right choice or not.
>
> Here is a basic input example of what I'm working with.
>
> [Input file info]
> more info
> more info
> etc.
> etc.
> *Keyword*
> different info
> different info
> *Keyword*
> some more info
>
> For the example above, each section can be generated separately from each
> other. However, within each section, different lines are dependent upon each
> other to generate a valid PMML file.
>
> Can anyone offer a suggestion what type of input format I should use?
>
> Thanks for your time
> Erik
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
View raw message