hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: New to hadoop, trying to write a customary file split
Date Mon, 18 Jul 2011 20:32:39 GMT
You are correct = they got refactored int readFromCurrentBuffer

On Mon, Jul 18, 2011 at 11:47 AM, Erik T <erik.shiken@gmail.com> wrote:

> I understand that part but I don't see startTag or startTag2 used in the
> nextKeyValue method after they have been declared.
> Erik
>
>
>
> On 18 July 2011 14:20, Steve Lewis <lordjoe2000@gmail.com> wrote:
>
>> The reason for the two id that it may say
>> <Foo> ....
>> or
>> <Foo attr1="...
>> - now I suppose you could just look for <Foo which would cover either case
>>
>> Also note I am cheating a bit and this will not handle properly tags which
>> are commented out with
>> the xml comment <!-- but I doubt it is possible to handle these without
>> parsing the entire (potentially large file)
>>
>>
>> On Mon, Jul 18, 2011 at 9:40 AM, Erik T <erik.shiken@gmail.com> wrote:
>>
>>> Hi Steven,
>>>
>>> Thank you for the sample. I have one question though.
>>>
>>> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
>>>  Erik
>>>
>>>
>>>
>>> On 11 July 2011 15:11, Steve Lewis <lordjoe2000@gmail.com> wrote:
>>>
>>>> Look at this sample
>>>> =============================================
>>>> package org.systemsbiology.hadoop;
>>>>
>>>>
>>>>
>>>> import org.apache.hadoop.conf.*;
>>>> import org.apache.hadoop.fs.*;
>>>> import org.apache.hadoop.fs.FileSystem;
>>>> import org.apache.hadoop.io.*;
>>>> import org.apache.hadoop.io.compress.*;
>>>> import org.apache.hadoop.mapreduce.*;
>>>> import org.apache.hadoop.mapreduce.lib.input.*;
>>>>
>>>> import java.io.*;
>>>> import java.util.*;
>>>>
>>>> /**
>>>>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>>>>  * Splitter that reads scan tags from an XML file
>>>>  * No assumption is made about lines but tage and end tags MUST look
>>>> like <MyTag </MyTag> with no embedded spaces
>>>>  * usually you will subclass and hard code the tag you want to split on
>>>>  */
>>>> public class XMLTagInputFormat extends FileInputFormat<Text, Text>
{
>>>>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>>>>
>>>>
>>>>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>>>>
>>>>
>>>>     public static final int BUFFER_SIZE = 4096;
>>>>
>>>>     private final String m_BaseTag;
>>>>     private final String m_StartTag;
>>>>     private final String m_EndTag;
>>>>     private String m_Extension;
>>>>
>>>>     public XMLTagInputFormat(final String pBaseTag) {
>>>>         m_BaseTag = pBaseTag;
>>>>         m_StartTag = "<" + pBaseTag;
>>>>         m_EndTag = "</" + pBaseTag + ">";
>>>>
>>>>     }
>>>>
>>>>     public String getExtension() {
>>>>         return m_Extension;
>>>>     }
>>>>
>>>>     public void setExtension(final String pExtension) {
>>>>         m_Extension = pExtension;
>>>>     }
>>>>
>>>>     public boolean isSplitReadable(InputSplit split) {
>>>>         if (!(split instanceof FileSplit))
>>>>             return true;
>>>>         FileSplit fsplit = (FileSplit) split;
>>>>         Path path1 = fsplit.getPath();
>>>>         return isPathAcceptable(path1);
>>>>     }
>>>>
>>>>     protected boolean isPathAcceptable(final Path pPath1) {
>>>>         String path = pPath1.toString().toLowerCase();
>>>>         if(path.startsWith("part-r-"))
>>>>             return true;
>>>>         String extension = getExtension();
>>>>         if (extension != null && path.endsWith(extension.toLowerCase()))
>>>>             return true;
>>>>         if (extension != null && path.endsWith(extension.toLowerCase()
+
>>>> ".gz"))
>>>>             return true;
>>>>         if (extension == null )
>>>>             return true;
>>>>         return false;
>>>>     }
>>>>
>>>>     public String getStartTag() {
>>>>         return m_StartTag;
>>>>     }
>>>>
>>>>     public String getBaseTag() {
>>>>         return m_BaseTag;
>>>>     }
>>>>
>>>>     public String getEndTag() {
>>>>         return m_EndTag;
>>>>     }
>>>>
>>>>     @Override
>>>>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>>>>
>>>> TaskAttemptContext context) {
>>>>         if (isSplitReadable(split))
>>>>             return new MyXMLFileReader();
>>>>         else
>>>>             return NullRecordReader.INSTANCE; // do not read
>>>>     }
>>>>
>>>>     @Override
>>>>     protected boolean isSplitable(JobContext context, Path file) {
>>>>         String fname = file.getName().toLowerCase();
>>>>         if(fname.endsWith(".gz"))
>>>>             return false;
>>>>         return true;
>>>>     }
>>>>
>>>>     /**
>>>>      * Generate the list of files and make them into FileSplits.
>>>>      * This needs to be copied to insert a filter on acceptable data
>>>>      */
>>>>     @Override
>>>>     public List<InputSplit> getSplits(JobContext job
>>>>     ) throws IOException {
>>>>         long minSize = Math.max(getFormatMinSplitSize(),
>>>> getMinSplitSize(job));
>>>>         long maxSize = getMaxSplitSize(job);
>>>>
>>>>         // generate splits
>>>>         List<InputSplit> splits = new ArrayList<InputSplit>();
>>>>         for (FileStatus file : listStatus(job)) {
>>>>             Path path = file.getPath();
>>>>             if (!isPathAcceptable(path))   // filter acceptable data
>>>>                 continue;
>>>>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>>>>             long length = file.getLen();
>>>>             BlockLocation[] blkLocations =
>>>> fs.getFileBlockLocations(file, 0, length);
>>>>             if ((length != 0) && isSplitable(job, path)) {
>>>>                 long blockSize = file.getBlockSize();
>>>>                 long splitSize = computeSplitSize(blockSize, minSize,
>>>> maxSize);
>>>>
>>>>                 long bytesRemaining = length;
>>>>                 while (((double) bytesRemaining) / splitSize >
>>>> SPLIT_SLOP) {
>>>>                     int blkIndex = getBlockIndex(blkLocations, length -
>>>> bytesRemaining);
>>>>                     splits.add(new FileSplit(path, length -
>>>> bytesRemaining, splitSize,
>>>>                             blkLocations[blkIndex].getHosts()));
>>>>                     bytesRemaining -= splitSize;
>>>>                 }
>>>>
>>>>                 if (bytesRemaining != 0) {
>>>>                     splits.add(new FileSplit(path, length -
>>>> bytesRemaining, bytesRemaining,
>>>>                             blkLocations[blkLocations.length -
>>>> 1].getHosts()));
>>>>                 }
>>>>              }
>>>>             else if (length != 0) {
>>>>                 splits.add(new FileSplit(path, 0, length,
>>>> blkLocations[0].getHosts()));
>>>>             }
>>>>             else {
>>>>                 //Create empty hosts array for zero length files
>>>>                 splits.add(new FileSplit(path, 0, length, new
>>>> String[0]));
>>>>             }
>>>>         }
>>>>     //    LOG.debug("Total # of splits: " + splits.size());
>>>>         return splits;
>>>>     }
>>>>
>>>>     /**
>>>>      * Custom RecordReader which returns the entire file as a
>>>>      * single m_Value with the name as a m_Key
>>>>      * Value is the entire file
>>>>      * Key is the file name
>>>>      */
>>>>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>>>>
>>>>         private CompressionCodecFactory compressionCodecs = null;
>>>>         private long m_Start;
>>>>         private long m_End;
>>>>         private long m_Current;
>>>>         private BufferedReader m_Input;
>>>>         private Text m_Key;
>>>>         private Text m_Value = null;
>>>>         private char[] m_Buffer = new char[BUFFER_SIZE];
>>>>         StringBuilder m_Sb = new StringBuilder();
>>>>
>>>>         public void initialize(InputSplit genericSplit,
>>>>                                TaskAttemptContext context) throws
>>>> IOException {
>>>>             FileSplit split = (FileSplit) genericSplit;
>>>>             Configuration job = context.getConfiguration();
>>>>             m_Sb.setLength(0);
>>>>             m_Start = split.getStart();
>>>>             m_End = m_Start + split.getLength();
>>>>             final Path file = split.getPath();
>>>>             compressionCodecs = new CompressionCodecFactory(job);
>>>>             final CompressionCodec codec =
>>>> compressionCodecs.getCodec(file);
>>>>
>>>>             // open the file and seek to the m_Start of the split
>>>>             FileSystem fs = file.getFileSystem(job);
>>>>             FSDataInputStream fileIn = fs.open(split.getPath());
>>>>             if (codec != null) {
>>>>                 CompressionInputStream inputStream =
>>>> codec.createInputStream(fileIn);
>>>>                 m_Input = new BufferedReader(new
>>>> InputStreamReader(inputStream));
>>>>                 m_End = Long.MAX_VALUE;
>>>>             }
>>>>             else {
>>>>                 m_Input = new BufferedReader(new
>>>> InputStreamReader(fileIn));
>>>>             }
>>>>             m_Current = m_Start;
>>>>             if (m_Key == null) {
>>>>                 m_Key = new Text();
>>>>             }
>>>>             m_Key.set(split.getPath().getName());
>>>>             if (m_Value == null) {
>>>>                 m_Value = new Text();
>>>>             }
>>>>
>>>>         }
>>>>
>>>>         /**
>>>>          * look for a <scan tag then read until it closes
>>>>          *
>>>>          * @return true if there is data
>>>>          * @throws java.io.IOException
>>>>          */
>>>>         public boolean nextKeyValue() throws IOException {
>>>>             if(readFromCurrentBuffer())
>>>>                 return true;
>>>>             int newSize = 0;
>>>>             String startTag = getStartTag() + " ";
>>>>             String startTag2 = getStartTag() + ">";
>>>>             newSize = m_Input.read(m_Buffer);
>>>>
>>>>             while (newSize > 0) {
>>>>                 m_Current += newSize;
>>>>                 m_Sb.append(m_Buffer, 0, newSize);
>>>>                 if( readFromCurrentBuffer())
>>>>                     return true;
>>>>                 newSize = m_Input.read(m_Buffer);
>>>>             }
>>>>             // exit because we are at the m_End
>>>>             if (newSize <= 0) {
>>>>                 m_Key = null;
>>>>                 m_Value = null;
>>>>                 return false;
>>>>             }
>>>>
>>>>             return true;
>>>>         }
>>>>
>>>>         protected boolean readFromCurrentBuffer()
>>>>         {
>>>>             String endTag = getEndTag();
>>>>               String startText = m_Sb.toString();
>>>>             if(!startText.contains(endTag))
>>>>                 return false; // need more read
>>>>             String startTag = getStartTag() + " ";
>>>>              String startTag2 = getStartTag() + ">";
>>>>             int index = startText.indexOf(startTag);
>>>>             if (index == -1)
>>>>                 index = startText.indexOf(startTag2);
>>>>             if(index == -1)
>>>>                 return false;
>>>>             startText = startText.substring(index);
>>>>             m_Sb.setLength(0);
>>>>             m_Sb.append(startText);
>>>>
>>>>             String s = startText;
>>>>             index = s.indexOf(endTag);
>>>>             if (index == -1)
>>>>                 return false; // need more read
>>>>                // throw new IllegalStateException("unmatched tag " +
>>>> getBaseTag());
>>>>             index += endTag.length();
>>>>             String tag = s.substring(0, index).trim();
>>>>             m_Value.set(tag);
>>>>
>>>>             // keep the remaining text to add to the next tag
>>>>             m_Sb.setLength(0);
>>>>             String rest = s.substring(index);
>>>>             m_Sb.append(rest);
>>>>             return true;
>>>>         }
>>>>
>>>>         @Override
>>>>         public Text getCurrentKey() {
>>>>             return m_Key;
>>>>         }
>>>>
>>>>         @Override
>>>>         public Text getCurrentValue() {
>>>>             return m_Value;
>>>>         }
>>>>
>>>>         /**
>>>>          * Get the progress within the split
>>>>          */
>>>>         public float getProgress() {
>>>>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>>>>         }
>>>>
>>>>         public synchronized void close() throws IOException {
>>>>             if (m_Input != null) {
>>>>                 m_Input.close();
>>>>             }
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> =============================================
>>>>
>>>>
>>>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shiken@gmail.com> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R
>>>>> program to parse a file and generate a PMML file as output.
>>>>>
>>>>> What I would like to do is split a file by a keyword instead a given
>>>>> number of lines because the location of the split could change from time
to
>>>>> time.
>>>>>
>>>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would
>>>>> be the way to go but I'm not finding any clear examples how to use it.
So
>>>>> I'm not sure if this is the right choice or not.
>>>>>
>>>>> Here is a basic input example of what I'm working with.
>>>>>
>>>>> [Input file info]
>>>>> more info
>>>>> more info
>>>>> etc.
>>>>> etc.
>>>>> *Keyword*
>>>>> different info
>>>>> different info
>>>>> *Keyword*
>>>>> some more info
>>>>>
>>>>> For the example above, each section can be generated separately from
>>>>> each other. However, within each section, different lines are dependent
upon
>>>>> each other to generate a valid PMML file.
>>>>>
>>>>> Can anyone offer a suggestion what type of input format I should use?
>>>>>
>>>>> Thanks for your time
>>>>> Erik
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Steven M. Lewis PhD
>>>> 4221 105th Ave NE
>>>> Kirkland, WA 98033
>>>> 206-384-1340 (cell)
>>>> Skype lordjoe_com
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Steven M. Lewis PhD
>> 4221 105th Ave NE
>> Kirkland, WA 98033
>> 206-384-1340 (cell)
>> Skype lordjoe_com
>>
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
View raw message