hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun C Murthy <...@hortonworks.com>
Subject Re: New to hadoop, trying to write a customary file split
Date Mon, 18 Jul 2011 20:17:56 GMT
Hey Steve,

 Want to contribute it as an example to MR? Would love to help.

thanks,
Arun

On Jul 11, 2011, at 12:11 PM, Steve Lewis wrote:

> Look at this sample 
> =============================================
> package org.systemsbiology.hadoop;
> 
> 
> 
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.io.compress.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
> 
> import java.io.*;
> import java.util.*;
> 
> /**
>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>  * Splitter that reads scan tags from an XML file
>  * No assumption is made about lines but tage and end tags MUST look like <MyTag </MyTag>
with no embedded spaces
>  * usually you will subclass and hard code the tag you want to split on
>  */
> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
> 
> 
>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
> 
> 
>     public static final int BUFFER_SIZE = 4096;
> 
>     private final String m_BaseTag;
>     private final String m_StartTag;
>     private final String m_EndTag;
>     private String m_Extension;
> 
>     public XMLTagInputFormat(final String pBaseTag) {
>         m_BaseTag = pBaseTag;
>         m_StartTag = "<" + pBaseTag;
>         m_EndTag = "</" + pBaseTag + ">";
> 
>     }
> 
>     public String getExtension() {
>         return m_Extension;
>     }
> 
>     public void setExtension(final String pExtension) {
>         m_Extension = pExtension;
>     }
> 
>     public boolean isSplitReadable(InputSplit split) {
>         if (!(split instanceof FileSplit))
>             return true;
>         FileSplit fsplit = (FileSplit) split;
>         Path path1 = fsplit.getPath();
>         return isPathAcceptable(path1);
>     }
> 
>     protected boolean isPathAcceptable(final Path pPath1) {
>         String path = pPath1.toString().toLowerCase();
>         if(path.startsWith("part-r-"))
>             return true;
>         String extension = getExtension();
>         if (extension != null && path.endsWith(extension.toLowerCase()))
>             return true;
>         if (extension != null && path.endsWith(extension.toLowerCase() + ".gz"))
>             return true;
>         if (extension == null )
>             return true;
>         return false;
>     }
> 
>     public String getStartTag() {
>         return m_StartTag;
>     }
> 
>     public String getBaseTag() {
>         return m_BaseTag;
>     }
> 
>     public String getEndTag() {
>         return m_EndTag;
>     }
> 
>     @Override
>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>                                                        TaskAttemptContext context) {
>         if (isSplitReadable(split))
>             return new MyXMLFileReader();
>         else
>             return NullRecordReader.INSTANCE; // do not read
>     }
> 
>     @Override
>     protected boolean isSplitable(JobContext context, Path file) {
>         String fname = file.getName().toLowerCase();
>         if(fname.endsWith(".gz"))
>             return false;
>         return true;
>     }
> 
>     /**
>      * Generate the list of files and make them into FileSplits.
>      * This needs to be copied to insert a filter on acceptable data
>      */
>     @Override
>     public List<InputSplit> getSplits(JobContext job
>     ) throws IOException {
>         long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
>         long maxSize = getMaxSplitSize(job);
> 
>         // generate splits
>         List<InputSplit> splits = new ArrayList<InputSplit>();
>         for (FileStatus file : listStatus(job)) {
>             Path path = file.getPath();
>             if (!isPathAcceptable(path))   // filter acceptable data
>                 continue;
>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>             long length = file.getLen();
>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
>             if ((length != 0) && isSplitable(job, path)) {
>                 long blockSize = file.getBlockSize();
>                 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
> 
>                 long bytesRemaining = length;
>                 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
>                     int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
>                     splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
>                             blkLocations[blkIndex].getHosts()));
>                     bytesRemaining -= splitSize;
>                 }
> 
>                 if (bytesRemaining != 0) {
>                     splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
>                             blkLocations[blkLocations.length - 1].getHosts()));
>                 }
>             }
>             else if (length != 0) {
>                 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
>             }
>             else {
>                 //Create empty hosts array for zero length files
>                 splits.add(new FileSplit(path, 0, length, new String[0]));
>             }
>         }
>     //    LOG.debug("Total # of splits: " + splits.size());
>         return splits;
>     }
> 
>     /**
>      * Custom RecordReader which returns the entire file as a
>      * single m_Value with the name as a m_Key
>      * Value is the entire file
>      * Key is the file name
>      */
>     public class MyXMLFileReader extends RecordReader<Text, Text> {
> 
>         private CompressionCodecFactory compressionCodecs = null;
>         private long m_Start;
>         private long m_End;
>         private long m_Current;
>         private BufferedReader m_Input;
>         private Text m_Key;
>         private Text m_Value = null;
>         private char[] m_Buffer = new char[BUFFER_SIZE];
>         StringBuilder m_Sb = new StringBuilder();
> 
>         public void initialize(InputSplit genericSplit,
>                                TaskAttemptContext context) throws IOException {
>             FileSplit split = (FileSplit) genericSplit;
>             Configuration job = context.getConfiguration();
>             m_Sb.setLength(0);
>             m_Start = split.getStart();
>             m_End = m_Start + split.getLength();
>             final Path file = split.getPath();
>             compressionCodecs = new CompressionCodecFactory(job);
>             final CompressionCodec codec = compressionCodecs.getCodec(file);
> 
>             // open the file and seek to the m_Start of the split
>             FileSystem fs = file.getFileSystem(job);
>             FSDataInputStream fileIn = fs.open(split.getPath());
>             if (codec != null) {
>                 CompressionInputStream inputStream = codec.createInputStream(fileIn);
>                 m_Input = new BufferedReader(new InputStreamReader(inputStream));
>                 m_End = Long.MAX_VALUE;
>             }
>             else {
>                 m_Input = new BufferedReader(new InputStreamReader(fileIn));
>             }
>             m_Current = m_Start;
>             if (m_Key == null) {
>                 m_Key = new Text();
>             }
>             m_Key.set(split.getPath().getName());
>             if (m_Value == null) {
>                 m_Value = new Text();
>             }
> 
>         }
> 
>         /**
>          * look for a <scan tag then read until it closes
>          *
>          * @return true if there is data
>          * @throws java.io.IOException
>          */
>         public boolean nextKeyValue() throws IOException {
>             if(readFromCurrentBuffer())
>                 return true;
>             int newSize = 0;
>             String startTag = getStartTag() + " ";
>             String startTag2 = getStartTag() + ">";
>             newSize = m_Input.read(m_Buffer);
> 
>             while (newSize > 0) {
>                 m_Current += newSize;
>                 m_Sb.append(m_Buffer, 0, newSize);
>                 if( readFromCurrentBuffer())
>                     return true;
>                 newSize = m_Input.read(m_Buffer);
>             }
>             // exit because we are at the m_End
>             if (newSize <= 0) {
>                 m_Key = null;
>                 m_Value = null;
>                 return false;
>             }
> 
>             return true;
>         }
> 
>         protected boolean readFromCurrentBuffer()
>         {
>             String endTag = getEndTag();
>               String startText = m_Sb.toString();
>             if(!startText.contains(endTag))
>                 return false; // need more read
>             String startTag = getStartTag() + " ";
>              String startTag2 = getStartTag() + ">";
>             int index = startText.indexOf(startTag);
>             if (index == -1)
>                 index = startText.indexOf(startTag2);
>             if(index == -1)
>                 return false;
>             startText = startText.substring(index);
>             m_Sb.setLength(0);
>             m_Sb.append(startText);
> 
>             String s = startText;
>             index = s.indexOf(endTag);
>             if (index == -1)
>                 return false; // need more read
>                // throw new IllegalStateException("unmatched tag " + getBaseTag());
>             index += endTag.length();
>             String tag = s.substring(0, index).trim();
>             m_Value.set(tag);
> 
>             // keep the remaining text to add to the next tag
>             m_Sb.setLength(0);
>             String rest = s.substring(index);
>             m_Sb.append(rest);
>             return true;
>         }
> 
>         @Override
>         public Text getCurrentKey() {
>             return m_Key;
>         }
> 
>         @Override
>         public Text getCurrentValue() {
>             return m_Value;
>         }
> 
>         /**
>          * Get the progress within the split
>          */
>         public float getProgress() {
>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>         }
> 
>         public synchronized void close() throws IOException {
>             if (m_Input != null) {
>                 m_Input.close();
>             }
>         }
>     }
> }
> 
> =============================================
> 
> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <erik.shiken@gmail.com> wrote:
> Hello everyone,
> 
> I'm new to Hadoop and I'm trying to figure out how to design a M/R program to parse a
file and generate a PMML file as output.
> 
> What I would like to do is split a file by a keyword instead a given number of lines
because the location of the split could change from time to time.
> 
> I'm looking around and was thinking maybe KeyValueTextInputFormat would be the way to
go but I'm not finding any clear examples how to use it. So I'm not sure if this is the right
choice or not.
> 
> Here is a basic input example of what I'm working with.
> 
> [Input file info]
> more info
> more info
> etc.
> etc.
> Keyword
> different info
> different info
> Keyword
> some more info
> 
> For the example above, each section can be generated separately from each other. However,
within each section, different lines are dependent upon each other to generate a valid PMML
file.
> 
> Can anyone offer a suggestion what type of input format I should use?
> 
> Thanks for your time
> Erik
> 
> 
> 
> -- 
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
> 
> 


Mime
View raw message