hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Erik T <erik.shi...@gmail.com>
Subject Re: New to hadoop, trying to write a customary file split
Date Mon, 18 Jul 2011 16:40:19 GMT
Hi Steven,

Thank you for the sample. I have one question though.

In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
Erik


On 11 July 2011 15:11, Steve Lewis <lordjoe2000@gmail.com> wrote:

> Look at this sample
> =============================================
> package org.systemsbiology.hadoop;
>
>
>
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.io.compress.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
>
> import java.io.*;
> import java.util.*;
>
> /**
>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>  * Splitter that reads scan tags from an XML file
>  * No assumption is made about lines but tage and end tags MUST look like
> <MyTag </MyTag> with no embedded spaces
>  * usually you will subclass and hard code the tag you want to split on
>  */
> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>
>
>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>
>
>     public static final int BUFFER_SIZE = 4096;
>
>     private final String m_BaseTag;
>     private final String m_StartTag;
>     private final String m_EndTag;
>     private String m_Extension;
>
>     public XMLTagInputFormat(final String pBaseTag) {
>         m_BaseTag = pBaseTag;
>         m_StartTag = "<" + pBaseTag;
>         m_EndTag = "</" + pBaseTag + ">";
>
>     }
>
>     public String getExtension() {
>         return m_Extension;
>     }
>
>     public void setExtension(final String pExtension) {
>         m_Extension = pExtension;
>     }
>
>     public boolean isSplitReadable(InputSplit split) {
>         if (!(split instanceof FileSplit))
>             return true;
>         FileSplit fsplit = (FileSplit) split;
>         Path path1 = fsplit.getPath();
>         return isPathAcceptable(path1);
>     }
>
>     protected boolean isPathAcceptable(final Path pPath1) {
>         String path = pPath1.toString().toLowerCase();
>         if(path.startsWith("part-r-"))
>             return true;
>         String extension = getExtension();
>         if (extension != null && path.endsWith(extension.toLowerCase()))
>             return true;
>         if (extension != null && path.endsWith(extension.toLowerCase() +
> ".gz"))
>             return true;
>         if (extension == null )
>             return true;
>         return false;
>     }
>
>     public String getStartTag() {
>         return m_StartTag;
>     }
>
>     public String getBaseTag() {
>         return m_BaseTag;
>     }
>
>     public String getEndTag() {
>         return m_EndTag;
>     }
>
>     @Override
>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>                                                        TaskAttemptContext
> context) {
>         if (isSplitReadable(split))
>             return new MyXMLFileReader();
>         else
>             return NullRecordReader.INSTANCE; // do not read
>     }
>
>     @Override
>     protected boolean isSplitable(JobContext context, Path file) {
>         String fname = file.getName().toLowerCase();
>         if(fname.endsWith(".gz"))
>             return false;
>         return true;
>     }
>
>     /**
>      * Generate the list of files and make them into FileSplits.
>      * This needs to be copied to insert a filter on acceptable data
>      */
>     @Override
>     public List<InputSplit> getSplits(JobContext job
>     ) throws IOException {
>         long minSize = Math.max(getFormatMinSplitSize(),
> getMinSplitSize(job));
>         long maxSize = getMaxSplitSize(job);
>
>         // generate splits
>         List<InputSplit> splits = new ArrayList<InputSplit>();
>         for (FileStatus file : listStatus(job)) {
>             Path path = file.getPath();
>             if (!isPathAcceptable(path))   // filter acceptable data
>                 continue;
>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>             long length = file.getLen();
>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
> 0, length);
>             if ((length != 0) && isSplitable(job, path)) {
>                 long blockSize = file.getBlockSize();
>                 long splitSize = computeSplitSize(blockSize, minSize,
> maxSize);
>
>                 long bytesRemaining = length;
>                 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP)
> {
>                     int blkIndex = getBlockIndex(blkLocations, length -
> bytesRemaining);
>                     splits.add(new FileSplit(path, length - bytesRemaining,
> splitSize,
>                             blkLocations[blkIndex].getHosts()));
>                     bytesRemaining -= splitSize;
>                 }
>
>                 if (bytesRemaining != 0) {
>                     splits.add(new FileSplit(path, length - bytesRemaining,
> bytesRemaining,
>                             blkLocations[blkLocations.length -
> 1].getHosts()));
>                 }
>             }
>             else if (length != 0) {
>                 splits.add(new FileSplit(path, 0, length,
> blkLocations[0].getHosts()));
>             }
>             else {
>                 //Create empty hosts array for zero length files
>                 splits.add(new FileSplit(path, 0, length, new String[0]));
>             }
>         }
>     //    LOG.debug("Total # of splits: " + splits.size());
>         return splits;
>     }
>
>     /**
>      * Custom RecordReader which returns the entire file as a
>      * single m_Value with the name as a m_Key
>      * Value is the entire file
>      * Key is the file name
>      */
>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>
>         private CompressionCodecFactory compressionCodecs = null;
>         private long m_Start;
>         private long m_End;
>         private long m_Current;
>         private BufferedReader m_Input;
>         private Text m_Key;
>         private Text m_Value = null;
>         private char[] m_Buffer = new char[BUFFER_SIZE];
>         StringBuilder m_Sb = new StringBuilder();
>
>         public void initialize(InputSplit genericSplit,
>                                TaskAttemptContext context) throws
> IOException {
>             FileSplit split = (FileSplit) genericSplit;
>             Configuration job = context.getConfiguration();
>             m_Sb.setLength(0);
>             m_Start = split.getStart();
>             m_End = m_Start + split.getLength();
>             final Path file = split.getPath();
>             compressionCodecs = new CompressionCodecFactory(job);
>             final CompressionCodec codec =
> compressionCodecs.getCodec(file);
>
>             // open the file and seek to the m_Start of the split
>             FileSystem fs = file.getFileSystem(job);
>             FSDataInputStream fileIn = fs.open(split.getPath());
>             if (codec != null) {
>                 CompressionInputStream inputStream =
> codec.createInputStream(fileIn);
>                 m_Input = new BufferedReader(new
> InputStreamReader(inputStream));
>                 m_End = Long.MAX_VALUE;
>             }
>             else {
>                 m_Input = new BufferedReader(new
> InputStreamReader(fileIn));
>             }
>             m_Current = m_Start;
>             if (m_Key == null) {
>                 m_Key = new Text();
>             }
>             m_Key.set(split.getPath().getName());
>             if (m_Value == null) {
>                 m_Value = new Text();
>             }
>
>         }
>
>         /**
>          * look for a <scan tag then read until it closes
>          *
>          * @return true if there is data
>          * @throws java.io.IOException
>          */
>         public boolean nextKeyValue() throws IOException {
>             if(readFromCurrentBuffer())
>                 return true;
>             int newSize = 0;
>             String startTag = getStartTag() + " ";
>             String startTag2 = getStartTag() + ">";
>             newSize = m_Input.read(m_Buffer);
>
>             while (newSize > 0) {
>                 m_Current += newSize;
>                 m_Sb.append(m_Buffer, 0, newSize);
>                 if( readFromCurrentBuffer())
>                     return true;
>                 newSize = m_Input.read(m_Buffer);
>             }
>             // exit because we are at the m_End
>             if (newSize <= 0) {
>                 m_Key = null;
>                 m_Value = null;
>                 return false;
>             }
>
>             return true;
>         }
>
>         protected boolean readFromCurrentBuffer()
>         {
>             String endTag = getEndTag();
>               String startText = m_Sb.toString();
>             if(!startText.contains(endTag))
>                 return false; // need more read
>             String startTag = getStartTag() + " ";
>              String startTag2 = getStartTag() + ">";
>             int index = startText.indexOf(startTag);
>             if (index == -1)
>                 index = startText.indexOf(startTag2);
>             if(index == -1)
>                 return false;
>             startText = startText.substring(index);
>             m_Sb.setLength(0);
>             m_Sb.append(startText);
>
>             String s = startText;
>             index = s.indexOf(endTag);
>             if (index == -1)
>                 return false; // need more read
>                // throw new IllegalStateException("unmatched tag " +
> getBaseTag());
>             index += endTag.length();
>             String tag = s.substring(0, index).trim();
>             m_Value.set(tag);
>
>             // keep the remaining text to add to the next tag
>             m_Sb.setLength(0);
>             String rest = s.substring(index);
>             m_Sb.append(rest);
>             return true;
>         }
>
>         @Override
>         public Text getCurrentKey() {
>             return m_Key;
>         }
>
>         @Override
>         public Text getCurrentValue() {
>             return m_Value;
>         }
>
>         /**
>          * Get the progress within the split
>          */
>         public float getProgress() {
>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>         }
>
>         public synchronized void close() throws IOException {
>             if (m_Input != null) {
>                 m_Input.close();
>             }
>         }
>     }
> }
>
> =============================================
>
>
> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shiken@gmail.com> wrote:
>
>> Hello everyone,
>>
>> I'm new to Hadoop and I'm trying to figure out how to design a M/R program
>> to parse a file and generate a PMML file as output.
>>
>> What I would like to do is split a file by a keyword instead a given
>> number of lines because the location of the split could change from time to
>> time.
>>
>> I'm looking around and was thinking maybe KeyValueTextInputFormat would be
>> the way to go but I'm not finding any clear examples how to use it. So I'm
>> not sure if this is the right choice or not.
>>
>> Here is a basic input example of what I'm working with.
>>
>> [Input file info]
>> more info
>> more info
>> etc.
>> etc.
>> *Keyword*
>> different info
>> different info
>> *Keyword*
>> some more info
>>
>> For the example above, each section can be generated separately from each
>> other. However, within each section, different lines are dependent upon each
>> other to generate a valid PMML file.
>>
>> Can anyone offer a suggestion what type of input format I should use?
>>
>> Thanks for your time
>> Erik
>>
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>
>

Mime
View raw message