hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alan Ho <karlu...@yahoo.ca>
Subject Re: map/reduce function on xml string
Date Wed, 19 Mar 2008 08:23:56 GMT
Check out this patch - it uses a StAX parser and is more extensible.

Alan Ho

----- Original Message ----
From: Colin Evans <colin@metaweb.com>
To: core-user@hadoop.apache.org
Sent: Tuesday, March 4, 2008 11:27:34 AM
Subject: Re: map/reduce function on xml string

    Here's the code.  If folks are interested, I can submit it as a patchas well.

Prasan Ary wrote:  Colin,  Is it possible that you share some of the code with us?     thx,
 PrasanColin Evans <colin@metaweb.com> wrote:  We ended up subclassing TextInputFormat
and adding a custom RecordReader that starts and ends record reads on tags. TheStreamXmlRecordReader
class is a good reference for this.Prasan Ary wrote:        Hi All,
I am writing a java implementation for my map/reduce function on hadoop.
Input to this is a xml file, and the map function has to process a well formed xml records.
So far I have been unable to split the xml file at xml record boundary to feed into my map
Can anybody point me to resources where forcing file split at desired boundary is explained


Be a better friend, newshound, and know-it-all with Yahoo! Mobile. Try it now.


Looking for last minute shopping deals?  Find them fast with Yahoo! Search.

-----Inline Attachment Follows-----

package com.metaweb.hadoop.util;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;

import java.io.IOException;

 * Reads records that are delimited by a specifc begin/end tag.
public class XmlInputFormat extends TextInputFormat {
    public static final String START_TAG_KEY = "xmlinput.start";
    public static final String END_TAG_KEY = "xmlinput.end";

    public void configure(JobConf jobConf) {

    public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter)
throws IOException {
        return new XmlRecordReader((FileSplit) inputSplit, jobConf);

    public static class XmlRecordReader implements RecordReader {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();

        public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
            startTag = jobConf.get("xmlinput.start").getBytes("utf-8");
            endTag = jobConf.get("xmlinput.end").getBytes("utf-8");

            // open the file and seek to the start of the split
            start = split.getStart();
            end = start + split.getLength();
            Path file = split.getPath();
            FileSystem fs = file.getFileSystem(jobConf);
            fsin = fs.open(split.getPath());

        public boolean next(WritableComparable key, Writable value) throws IOException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        if (readUntilMatch(endTag, true)) {
                            ((Text) key).set(Long.toString(fsin.getPos()));
                            ((Text) value).set(buffer.getData(), 0, buffer.getLength());
                            return true;
                    finally {
            return false;

        public WritableComparable createKey() {
            return new Text();

        public Writable createValue() {
            return new Text();

        public long getPos() throws IOException {
            return fsin.getPos();

        public void close() throws IOException {

        public float getProgress() throws IOException {
            return ((float) (fsin.getPos() - start)) / ((float) (end - start));


        private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException
            int i = 0;
            while (true) {
                int b = fsin.read();
                // end of file:
                if (b == -1) return false;
                // save to buffer:
                if (withinBlock) buffer.write(b);

                // check if we're matching:
                if (b == match[i]) {
                    if (i >= match.length) return true;
                } else i = 0;
                // see if we've passed the stop point:
                if(!withinBlock && i == 0 && fsin.getPos() >= end) return

      Looking for the perfect gift? Give the gift of Flickr! 


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message