hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: New to hadoop, trying to write a customary file split
Date Mon, 18 Jul 2011 18:20:35 GMT
The reason for the two id that it may say
<Foo> ....
or
<Foo attr1="...
- now I suppose you could just look for <Foo which would cover either case

Also note I am cheating a bit and this will not handle properly tags which
are commented out with
the xml comment <!-- but I doubt it is possible to handle these without
parsing the entire (potentially large file)


On Mon, Jul 18, 2011 at 9:40 AM, Erik T <erik.shiken@gmail.com> wrote:

> Hi Steven,
>
> Thank you for the sample. I have one question though.
>
> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
> Erik
>
>
>
> On 11 July 2011 15:11, Steve Lewis <lordjoe2000@gmail.com> wrote:
>
>> Look at this sample
>> =============================================
>> package org.systemsbiology.hadoop;
>>
>>
>>
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.io.compress.*;
>> import org.apache.hadoop.mapreduce.*;
>> import org.apache.hadoop.mapreduce.lib.input.*;
>>
>> import java.io.*;
>> import java.util.*;
>>
>> /**
>>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>>  * Splitter that reads scan tags from an XML file
>>  * No assumption is made about lines but tage and end tags MUST look like
>> <MyTag </MyTag> with no embedded spaces
>>  * usually you will subclass and hard code the tag you want to split on
>>  */
>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>>
>>
>>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>>
>>
>>     public static final int BUFFER_SIZE = 4096;
>>
>>     private final String m_BaseTag;
>>     private final String m_StartTag;
>>     private final String m_EndTag;
>>     private String m_Extension;
>>
>>     public XMLTagInputFormat(final String pBaseTag) {
>>         m_BaseTag = pBaseTag;
>>         m_StartTag = "<" + pBaseTag;
>>         m_EndTag = "</" + pBaseTag + ">";
>>
>>     }
>>
>>     public String getExtension() {
>>         return m_Extension;
>>     }
>>
>>     public void setExtension(final String pExtension) {
>>         m_Extension = pExtension;
>>     }
>>
>>     public boolean isSplitReadable(InputSplit split) {
>>         if (!(split instanceof FileSplit))
>>             return true;
>>         FileSplit fsplit = (FileSplit) split;
>>         Path path1 = fsplit.getPath();
>>         return isPathAcceptable(path1);
>>     }
>>
>>     protected boolean isPathAcceptable(final Path pPath1) {
>>         String path = pPath1.toString().toLowerCase();
>>         if(path.startsWith("part-r-"))
>>             return true;
>>         String extension = getExtension();
>>         if (extension != null && path.endsWith(extension.toLowerCase()))
>>             return true;
>>         if (extension != null && path.endsWith(extension.toLowerCase() +
>> ".gz"))
>>             return true;
>>         if (extension == null )
>>             return true;
>>         return false;
>>     }
>>
>>     public String getStartTag() {
>>         return m_StartTag;
>>     }
>>
>>     public String getBaseTag() {
>>         return m_BaseTag;
>>     }
>>
>>     public String getEndTag() {
>>         return m_EndTag;
>>     }
>>
>>     @Override
>>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>>                                                        TaskAttemptContext
>> context) {
>>         if (isSplitReadable(split))
>>             return new MyXMLFileReader();
>>         else
>>             return NullRecordReader.INSTANCE; // do not read
>>     }
>>
>>     @Override
>>     protected boolean isSplitable(JobContext context, Path file) {
>>         String fname = file.getName().toLowerCase();
>>         if(fname.endsWith(".gz"))
>>             return false;
>>         return true;
>>     }
>>
>>     /**
>>      * Generate the list of files and make them into FileSplits.
>>      * This needs to be copied to insert a filter on acceptable data
>>      */
>>     @Override
>>     public List<InputSplit> getSplits(JobContext job
>>     ) throws IOException {
>>         long minSize = Math.max(getFormatMinSplitSize(),
>> getMinSplitSize(job));
>>         long maxSize = getMaxSplitSize(job);
>>
>>         // generate splits
>>         List<InputSplit> splits = new ArrayList<InputSplit>();
>>         for (FileStatus file : listStatus(job)) {
>>             Path path = file.getPath();
>>             if (!isPathAcceptable(path))   // filter acceptable data
>>                 continue;
>>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>>             long length = file.getLen();
>>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
>> 0, length);
>>             if ((length != 0) && isSplitable(job, path)) {
>>                 long blockSize = file.getBlockSize();
>>                 long splitSize = computeSplitSize(blockSize, minSize,
>> maxSize);
>>
>>                 long bytesRemaining = length;
>>                 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP)
>> {
>>                     int blkIndex = getBlockIndex(blkLocations, length -
>> bytesRemaining);
>>                     splits.add(new FileSplit(path, length -
>> bytesRemaining, splitSize,
>>                             blkLocations[blkIndex].getHosts()));
>>                     bytesRemaining -= splitSize;
>>                 }
>>
>>                 if (bytesRemaining != 0) {
>>                     splits.add(new FileSplit(path, length -
>> bytesRemaining, bytesRemaining,
>>                             blkLocations[blkLocations.length -
>> 1].getHosts()));
>>                 }
>>              }
>>             else if (length != 0) {
>>                 splits.add(new FileSplit(path, 0, length,
>> blkLocations[0].getHosts()));
>>             }
>>             else {
>>                 //Create empty hosts array for zero length files
>>                 splits.add(new FileSplit(path, 0, length, new String[0]));
>>             }
>>         }
>>     //    LOG.debug("Total # of splits: " + splits.size());
>>         return splits;
>>     }
>>
>>     /**
>>      * Custom RecordReader which returns the entire file as a
>>      * single m_Value with the name as a m_Key
>>      * Value is the entire file
>>      * Key is the file name
>>      */
>>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>>
>>         private CompressionCodecFactory compressionCodecs = null;
>>         private long m_Start;
>>         private long m_End;
>>         private long m_Current;
>>         private BufferedReader m_Input;
>>         private Text m_Key;
>>         private Text m_Value = null;
>>         private char[] m_Buffer = new char[BUFFER_SIZE];
>>         StringBuilder m_Sb = new StringBuilder();
>>
>>         public void initialize(InputSplit genericSplit,
>>                                TaskAttemptContext context) throws
>> IOException {
>>             FileSplit split = (FileSplit) genericSplit;
>>             Configuration job = context.getConfiguration();
>>             m_Sb.setLength(0);
>>             m_Start = split.getStart();
>>             m_End = m_Start + split.getLength();
>>             final Path file = split.getPath();
>>             compressionCodecs = new CompressionCodecFactory(job);
>>             final CompressionCodec codec =
>> compressionCodecs.getCodec(file);
>>
>>             // open the file and seek to the m_Start of the split
>>             FileSystem fs = file.getFileSystem(job);
>>             FSDataInputStream fileIn = fs.open(split.getPath());
>>             if (codec != null) {
>>                 CompressionInputStream inputStream =
>> codec.createInputStream(fileIn);
>>                 m_Input = new BufferedReader(new
>> InputStreamReader(inputStream));
>>                 m_End = Long.MAX_VALUE;
>>             }
>>             else {
>>                 m_Input = new BufferedReader(new
>> InputStreamReader(fileIn));
>>             }
>>             m_Current = m_Start;
>>             if (m_Key == null) {
>>                 m_Key = new Text();
>>             }
>>             m_Key.set(split.getPath().getName());
>>             if (m_Value == null) {
>>                 m_Value = new Text();
>>             }
>>
>>         }
>>
>>         /**
>>          * look for a <scan tag then read until it closes
>>          *
>>          * @return true if there is data
>>          * @throws java.io.IOException
>>          */
>>         public boolean nextKeyValue() throws IOException {
>>             if(readFromCurrentBuffer())
>>                 return true;
>>             int newSize = 0;
>>             String startTag = getStartTag() + " ";
>>             String startTag2 = getStartTag() + ">";
>>             newSize = m_Input.read(m_Buffer);
>>
>>             while (newSize > 0) {
>>                 m_Current += newSize;
>>                 m_Sb.append(m_Buffer, 0, newSize);
>>                 if( readFromCurrentBuffer())
>>                     return true;
>>                 newSize = m_Input.read(m_Buffer);
>>             }
>>             // exit because we are at the m_End
>>             if (newSize <= 0) {
>>                 m_Key = null;
>>                 m_Value = null;
>>                 return false;
>>             }
>>
>>             return true;
>>         }
>>
>>         protected boolean readFromCurrentBuffer()
>>         {
>>             String endTag = getEndTag();
>>               String startText = m_Sb.toString();
>>             if(!startText.contains(endTag))
>>                 return false; // need more read
>>             String startTag = getStartTag() + " ";
>>              String startTag2 = getStartTag() + ">";
>>             int index = startText.indexOf(startTag);
>>             if (index == -1)
>>                 index = startText.indexOf(startTag2);
>>             if(index == -1)
>>                 return false;
>>             startText = startText.substring(index);
>>             m_Sb.setLength(0);
>>             m_Sb.append(startText);
>>
>>             String s = startText;
>>             index = s.indexOf(endTag);
>>             if (index == -1)
>>                 return false; // need more read
>>                // throw new IllegalStateException("unmatched tag " +
>> getBaseTag());
>>             index += endTag.length();
>>             String tag = s.substring(0, index).trim();
>>             m_Value.set(tag);
>>
>>             // keep the remaining text to add to the next tag
>>             m_Sb.setLength(0);
>>             String rest = s.substring(index);
>>             m_Sb.append(rest);
>>             return true;
>>         }
>>
>>         @Override
>>         public Text getCurrentKey() {
>>             return m_Key;
>>         }
>>
>>         @Override
>>         public Text getCurrentValue() {
>>             return m_Value;
>>         }
>>
>>         /**
>>          * Get the progress within the split
>>          */
>>         public float getProgress() {
>>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>>         }
>>
>>         public synchronized void close() throws IOException {
>>             if (m_Input != null) {
>>                 m_Input.close();
>>             }
>>         }
>>     }
>> }
>>
>> =============================================
>>
>>
>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shiken@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R
>>> program to parse a file and generate a PMML file as output.
>>>
>>> What I would like to do is split a file by a keyword instead a given
>>> number of lines because the location of the split could change from time to
>>> time.
>>>
>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would
>>> be the way to go but I'm not finding any clear examples how to use it. So
>>> I'm not sure if this is the right choice or not.
>>>
>>> Here is a basic input example of what I'm working with.
>>>
>>> [Input file info]
>>> more info
>>> more info
>>> etc.
>>> etc.
>>> *Keyword*
>>> different info
>>> different info
>>> *Keyword*
>>> some more info
>>>
>>> For the example above, each section can be generated separately from each
>>> other. However, within each section, different lines are dependent upon each
>>> other to generate a valid PMML file.
>>>
>>> Can anyone offer a suggestion what type of input format I should use?
>>>
>>> Thanks for your time
>>> Erik
>>>
>>
>>
>>
>> --
>> Steven M. Lewis PhD
>> 4221 105th Ave NE
>> Kirkland, WA 98033
>> 206-384-1340 (cell)
>> Skype lordjoe_com
>>
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
View raw message