hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Erik T <erik.shi...@gmail.com>
Subject Re: New to hadoop, trying to write a customary file split
Date Mon, 18 Jul 2011 18:47:06 GMT
I understand that part but I don't see startTag or startTag2 used in the
nextKeyValue method after they have been declared.
Erik


On 18 July 2011 14:20, Steve Lewis <lordjoe2000@gmail.com> wrote:

> The reason for the two id that it may say
> <Foo> ....
> or
> <Foo attr1="...
> - now I suppose you could just look for <Foo which would cover either case
>
> Also note I am cheating a bit and this will not handle properly tags which
> are commented out with
> the xml comment <!-- but I doubt it is possible to handle these without
> parsing the entire (potentially large file)
>
>
> On Mon, Jul 18, 2011 at 9:40 AM, Erik T <erik.shiken@gmail.com> wrote:
>
>> Hi Steven,
>>
>> Thank you for the sample. I have one question though.
>>
>> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
>>  Erik
>>
>>
>>
>> On 11 July 2011 15:11, Steve Lewis <lordjoe2000@gmail.com> wrote:
>>
>>> Look at this sample
>>> =============================================
>>> package org.systemsbiology.hadoop;
>>>
>>>
>>>
>>> import org.apache.hadoop.conf.*;
>>> import org.apache.hadoop.fs.*;
>>> import org.apache.hadoop.fs.FileSystem;
>>> import org.apache.hadoop.io.*;
>>> import org.apache.hadoop.io.compress.*;
>>> import org.apache.hadoop.mapreduce.*;
>>> import org.apache.hadoop.mapreduce.lib.input.*;
>>>
>>> import java.io.*;
>>> import java.util.*;
>>>
>>> /**
>>>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>>>  * Splitter that reads scan tags from an XML file
>>>  * No assumption is made about lines but tage and end tags MUST look like
>>> <MyTag </MyTag> with no embedded spaces
>>>  * usually you will subclass and hard code the tag you want to split on
>>>  */
>>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>>>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>>>
>>>
>>>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>>>
>>>
>>>     public static final int BUFFER_SIZE = 4096;
>>>
>>>     private final String m_BaseTag;
>>>     private final String m_StartTag;
>>>     private final String m_EndTag;
>>>     private String m_Extension;
>>>
>>>     public XMLTagInputFormat(final String pBaseTag) {
>>>         m_BaseTag = pBaseTag;
>>>         m_StartTag = "<" + pBaseTag;
>>>         m_EndTag = "</" + pBaseTag + ">";
>>>
>>>     }
>>>
>>>     public String getExtension() {
>>>         return m_Extension;
>>>     }
>>>
>>>     public void setExtension(final String pExtension) {
>>>         m_Extension = pExtension;
>>>     }
>>>
>>>     public boolean isSplitReadable(InputSplit split) {
>>>         if (!(split instanceof FileSplit))
>>>             return true;
>>>         FileSplit fsplit = (FileSplit) split;
>>>         Path path1 = fsplit.getPath();
>>>         return isPathAcceptable(path1);
>>>     }
>>>
>>>     protected boolean isPathAcceptable(final Path pPath1) {
>>>         String path = pPath1.toString().toLowerCase();
>>>         if(path.startsWith("part-r-"))
>>>             return true;
>>>         String extension = getExtension();
>>>         if (extension != null && path.endsWith(extension.toLowerCase()))
>>>             return true;
>>>         if (extension != null && path.endsWith(extension.toLowerCase()
+
>>> ".gz"))
>>>             return true;
>>>         if (extension == null )
>>>             return true;
>>>         return false;
>>>     }
>>>
>>>     public String getStartTag() {
>>>         return m_StartTag;
>>>     }
>>>
>>>     public String getBaseTag() {
>>>         return m_BaseTag;
>>>     }
>>>
>>>     public String getEndTag() {
>>>         return m_EndTag;
>>>     }
>>>
>>>     @Override
>>>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>>>                                                        TaskAttemptContext
>>> context) {
>>>         if (isSplitReadable(split))
>>>             return new MyXMLFileReader();
>>>         else
>>>             return NullRecordReader.INSTANCE; // do not read
>>>     }
>>>
>>>     @Override
>>>     protected boolean isSplitable(JobContext context, Path file) {
>>>         String fname = file.getName().toLowerCase();
>>>         if(fname.endsWith(".gz"))
>>>             return false;
>>>         return true;
>>>     }
>>>
>>>     /**
>>>      * Generate the list of files and make them into FileSplits.
>>>      * This needs to be copied to insert a filter on acceptable data
>>>      */
>>>     @Override
>>>     public List<InputSplit> getSplits(JobContext job
>>>     ) throws IOException {
>>>         long minSize = Math.max(getFormatMinSplitSize(),
>>> getMinSplitSize(job));
>>>         long maxSize = getMaxSplitSize(job);
>>>
>>>         // generate splits
>>>         List<InputSplit> splits = new ArrayList<InputSplit>();
>>>         for (FileStatus file : listStatus(job)) {
>>>             Path path = file.getPath();
>>>             if (!isPathAcceptable(path))   // filter acceptable data
>>>                 continue;
>>>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>>>             long length = file.getLen();
>>>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
>>> 0, length);
>>>             if ((length != 0) && isSplitable(job, path)) {
>>>                 long blockSize = file.getBlockSize();
>>>                 long splitSize = computeSplitSize(blockSize, minSize,
>>> maxSize);
>>>
>>>                 long bytesRemaining = length;
>>>                 while (((double) bytesRemaining) / splitSize >
>>> SPLIT_SLOP) {
>>>                     int blkIndex = getBlockIndex(blkLocations, length -
>>> bytesRemaining);
>>>                     splits.add(new FileSplit(path, length -
>>> bytesRemaining, splitSize,
>>>                             blkLocations[blkIndex].getHosts()));
>>>                     bytesRemaining -= splitSize;
>>>                 }
>>>
>>>                 if (bytesRemaining != 0) {
>>>                     splits.add(new FileSplit(path, length -
>>> bytesRemaining, bytesRemaining,
>>>                             blkLocations[blkLocations.length -
>>> 1].getHosts()));
>>>                 }
>>>              }
>>>             else if (length != 0) {
>>>                 splits.add(new FileSplit(path, 0, length,
>>> blkLocations[0].getHosts()));
>>>             }
>>>             else {
>>>                 //Create empty hosts array for zero length files
>>>                 splits.add(new FileSplit(path, 0, length, new
>>> String[0]));
>>>             }
>>>         }
>>>     //    LOG.debug("Total # of splits: " + splits.size());
>>>         return splits;
>>>     }
>>>
>>>     /**
>>>      * Custom RecordReader which returns the entire file as a
>>>      * single m_Value with the name as a m_Key
>>>      * Value is the entire file
>>>      * Key is the file name
>>>      */
>>>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>>>
>>>         private CompressionCodecFactory compressionCodecs = null;
>>>         private long m_Start;
>>>         private long m_End;
>>>         private long m_Current;
>>>         private BufferedReader m_Input;
>>>         private Text m_Key;
>>>         private Text m_Value = null;
>>>         private char[] m_Buffer = new char[BUFFER_SIZE];
>>>         StringBuilder m_Sb = new StringBuilder();
>>>
>>>         public void initialize(InputSplit genericSplit,
>>>                                TaskAttemptContext context) throws
>>> IOException {
>>>             FileSplit split = (FileSplit) genericSplit;
>>>             Configuration job = context.getConfiguration();
>>>             m_Sb.setLength(0);
>>>             m_Start = split.getStart();
>>>             m_End = m_Start + split.getLength();
>>>             final Path file = split.getPath();
>>>             compressionCodecs = new CompressionCodecFactory(job);
>>>             final CompressionCodec codec =
>>> compressionCodecs.getCodec(file);
>>>
>>>             // open the file and seek to the m_Start of the split
>>>             FileSystem fs = file.getFileSystem(job);
>>>             FSDataInputStream fileIn = fs.open(split.getPath());
>>>             if (codec != null) {
>>>                 CompressionInputStream inputStream =
>>> codec.createInputStream(fileIn);
>>>                 m_Input = new BufferedReader(new
>>> InputStreamReader(inputStream));
>>>                 m_End = Long.MAX_VALUE;
>>>             }
>>>             else {
>>>                 m_Input = new BufferedReader(new
>>> InputStreamReader(fileIn));
>>>             }
>>>             m_Current = m_Start;
>>>             if (m_Key == null) {
>>>                 m_Key = new Text();
>>>             }
>>>             m_Key.set(split.getPath().getName());
>>>             if (m_Value == null) {
>>>                 m_Value = new Text();
>>>             }
>>>
>>>         }
>>>
>>>         /**
>>>          * look for a <scan tag then read until it closes
>>>          *
>>>          * @return true if there is data
>>>          * @throws java.io.IOException
>>>          */
>>>         public boolean nextKeyValue() throws IOException {
>>>             if(readFromCurrentBuffer())
>>>                 return true;
>>>             int newSize = 0;
>>>             String startTag = getStartTag() + " ";
>>>             String startTag2 = getStartTag() + ">";
>>>             newSize = m_Input.read(m_Buffer);
>>>
>>>             while (newSize > 0) {
>>>                 m_Current += newSize;
>>>                 m_Sb.append(m_Buffer, 0, newSize);
>>>                 if( readFromCurrentBuffer())
>>>                     return true;
>>>                 newSize = m_Input.read(m_Buffer);
>>>             }
>>>             // exit because we are at the m_End
>>>             if (newSize <= 0) {
>>>                 m_Key = null;
>>>                 m_Value = null;
>>>                 return false;
>>>             }
>>>
>>>             return true;
>>>         }
>>>
>>>         protected boolean readFromCurrentBuffer()
>>>         {
>>>             String endTag = getEndTag();
>>>               String startText = m_Sb.toString();
>>>             if(!startText.contains(endTag))
>>>                 return false; // need more read
>>>             String startTag = getStartTag() + " ";
>>>              String startTag2 = getStartTag() + ">";
>>>             int index = startText.indexOf(startTag);
>>>             if (index == -1)
>>>                 index = startText.indexOf(startTag2);
>>>             if(index == -1)
>>>                 return false;
>>>             startText = startText.substring(index);
>>>             m_Sb.setLength(0);
>>>             m_Sb.append(startText);
>>>
>>>             String s = startText;
>>>             index = s.indexOf(endTag);
>>>             if (index == -1)
>>>                 return false; // need more read
>>>                // throw new IllegalStateException("unmatched tag " +
>>> getBaseTag());
>>>             index += endTag.length();
>>>             String tag = s.substring(0, index).trim();
>>>             m_Value.set(tag);
>>>
>>>             // keep the remaining text to add to the next tag
>>>             m_Sb.setLength(0);
>>>             String rest = s.substring(index);
>>>             m_Sb.append(rest);
>>>             return true;
>>>         }
>>>
>>>         @Override
>>>         public Text getCurrentKey() {
>>>             return m_Key;
>>>         }
>>>
>>>         @Override
>>>         public Text getCurrentValue() {
>>>             return m_Value;
>>>         }
>>>
>>>         /**
>>>          * Get the progress within the split
>>>          */
>>>         public float getProgress() {
>>>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>>>         }
>>>
>>>         public synchronized void close() throws IOException {
>>>             if (m_Input != null) {
>>>                 m_Input.close();
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> =============================================
>>>
>>>
>>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shiken@gmail.com> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R
>>>> program to parse a file and generate a PMML file as output.
>>>>
>>>> What I would like to do is split a file by a keyword instead a given
>>>> number of lines because the location of the split could change from time
to
>>>> time.
>>>>
>>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would
>>>> be the way to go but I'm not finding any clear examples how to use it. So
>>>> I'm not sure if this is the right choice or not.
>>>>
>>>> Here is a basic input example of what I'm working with.
>>>>
>>>> [Input file info]
>>>> more info
>>>> more info
>>>> etc.
>>>> etc.
>>>> *Keyword*
>>>> different info
>>>> different info
>>>> *Keyword*
>>>> some more info
>>>>
>>>> For the example above, each section can be generated separately from
>>>> each other. However, within each section, different lines are dependent upon
>>>> each other to generate a valid PMML file.
>>>>
>>>> Can anyone offer a suggestion what type of input format I should use?
>>>>
>>>> Thanks for your time
>>>> Erik
>>>>
>>>
>>>
>>>
>>> --
>>> Steven M. Lewis PhD
>>> 4221 105th Ave NE
>>> Kirkland, WA 98033
>>> 206-384-1340 (cell)
>>> Skype lordjoe_com
>>>
>>>
>>>
>>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>
>

Mime
View raw message