hadoop-hdfs-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From madhu phatak <phatak....@gmail.com>
Subject Re: Processing xml documents using StreamXmlRecordReader
Date Tue, 19 Jun 2012 12:41:59 GMT
Hi,
 Yes you have the class, but it's for old API.

 Please find the code below  for ported classes for new API. I have not
tested the code,try to use these classes and let me know if its working for
you.


StreamInputFormat (new API)

package org.apache.hadoop.streaming;
import java.io.IOException;
import java.lang.reflect.Constructor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.streaming.StreamUtil;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */




/** An input format that selects a RecordReader based on a JobConf property.
 *  This should be used only for non-standard record reader such as
 *  StreamXmlRecordReader. For all other standard
 *  record readers, the appropriate input format classes should be used.
 */
public class StreamInputFormat extends KeyValueTextInputFormat {

@Override
public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {

Configuration conf = context.getConfiguration();
String c = conf.get("stream.recordreader.class");
if (c == null || c.indexOf("LineRecordReader") >= 0) {
return super.createRecordReader(genericSplit, context);
}

// handling non-standard record reader (likely StreamXmlRecordReader)
FileSplit split = (FileSplit) genericSplit;
//LOG.info("getRecordReader start.....split=" + split);
context.setStatus(split.toString());
context.progress();

// Open the file and seek to the start of the split
FileSystem fs = split.getPath().getFileSystem(conf);
FSDataInputStream in = fs.open(split.getPath());

// Factory dispatch based on available params..
Class readerClass;

{
readerClass = StreamUtil.goodClassOrNull(conf, c, null);
if (readerClass == null) {
throw new RuntimeException("Class not found: " + c);
}
}
Constructor ctor;
    try {
      ctor = readerClass.getConstructor(new Class[] {
FSDataInputStream.class,
                                                      FileSplit.class,
TaskAttemptContext.class, Configuration.class, FileSystem.class });
    } catch (NoSuchMethodException nsm) {
      throw new RuntimeException(nsm);
    }

    RecordReader<Text, Text> reader;
    try {
      reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] {
in, split,
                                                              context,
conf, fs });
    } catch (Exception nsm) {
      throw new RuntimeException(nsm);
    }
    return reader;


}

}

StreamXmlRecordReader

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.streaming;

import java.io.*;
import java.util.regex.*;

import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/** A way to interpret XML fragments as Mapper input records.
 *  Values are XML subtrees delimited by configurable tags.
 *  Keys could be the value of a certain attribute in the XML subtree,
 *  but this is left to the stream processor application.
 *
 *  The name-value properties that StreamXmlRecordReader understands are:
 *    String begin (chars marking beginning of record)
 *    String end   (chars marking end of record)
 *    int maxrec   (maximum record size)
 *    int lookahead(maximum lookahead to sync CDATA)
 *    boolean slowmatch
 */
public class StreamXmlRecordReader extends StreamBaseRecordReader {

  public StreamXmlRecordReader(FSDataInputStream in, FileSplit split,
TaskAttemptContext context,
                               Configuration conf, FileSystem fs) throws
IOException {
    super(in, split, context, conf, fs);

    beginMark_ = checkJobGet(CONF_NS + "begin");
    endMark_ = checkJobGet(CONF_NS + "end");

    maxRecSize_ = conf_.getInt(CONF_NS + "maxrec", 50 * 1000);
    lookAhead_ = conf_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
    synched_ = false;

    slowMatch_ = conf_.getBoolean(CONF_NS + "slowmatch", false);
    if (slowMatch_) {
      beginPat_ = makePatternCDataOrMark(beginMark_);
      endPat_ = makePatternCDataOrMark(endMark_);
    }
    init();
  }

  public void init() throws IOException {
    LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + "
end_=" + end_ + " length_="
             + length_ + " start_ > in_.getPos() =" + (start_ >
in_.getPos()) + " " + start_ + " > "
             + in_.getPos());
    if (start_ > in_.getPos()) {
      in_.seek(start_);
    }
    pos_ = start_;
    bin_ = new BufferedInputStream(in_);
    seekNextRecordBoundary();
  }

  int numNext = 0;

  public synchronized boolean next(Text key, Text value) throws IOException
{
    numNext++;
    if (pos_ >= end_) {
      return false;
    }

    DataOutputBuffer buf = new DataOutputBuffer();
    if (!readUntilMatchBegin()) {
      return false;
    }
    if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
      return false;
    }

    // There is only one elem..key/value splitting is not done here.
    byte[] record = new byte[buf.getLength()];
    System.arraycopy(buf.getData(), 0, record, 0, record.length);

    numRecStats(record, 0, record.length);

    key.set(record);
    value.set("");

    return true;
  }

  public void seekNextRecordBoundary() throws IOException {
    readUntilMatchBegin();
  }

  boolean readUntilMatchBegin() throws IOException {
    if (slowMatch_) {
      return slowReadUntilMatch(beginPat_, false, null);
    } else {
      return fastReadUntilMatch(beginMark_, false, null);
    }
  }

  private boolean readUntilMatchEnd(DataOutputBuffer buf) throws
IOException {
    if (slowMatch_) {
      return slowReadUntilMatch(endPat_, true, buf);
    } else {
      return fastReadUntilMatch(endMark_, true, buf);
    }
  }

  private boolean slowReadUntilMatch(Pattern markPattern, boolean
includePat,
                                     DataOutputBuffer outBufOrNull) throws
IOException {
    byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
    int read = 0;
    bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); //mark to invalidate
if we read more
    read = bin_.read(buf);
    if (read == -1) return false;

    String sbuf = new String(buf, 0, read, "UTF-8");
    Matcher match = markPattern.matcher(sbuf);

    firstMatchStart_ = NA;
    firstMatchEnd_ = NA;
    int bufPos = 0;
    int state = synched_ ? CDATA_OUT : CDATA_UNK;
    int s = 0;

    while (match.find(bufPos)) {
      int input;
      if (match.group(1) != null) {
        input = CDATA_BEGIN;
      } else if (match.group(2) != null) {
        input = CDATA_END;
        firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
      } else {
        input = RECORD_MAYBE;
      }
      if (input == RECORD_MAYBE) {
        if (firstMatchStart_ == NA) {
          firstMatchStart_ = match.start();
          firstMatchEnd_ = match.end();
        }
      }
      state = nextState(state, input, match.start());
      if (state == RECORD_ACCEPT) {
        break;
      }
      bufPos = match.end();
      s++;
    }
    if (state != CDATA_UNK) {
      synched_ = true;
    }
    boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT
|| state == CDATA_UNK);
    if (matched) {
      int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
      bin_.reset();

      for (long skiplen = endPos; skiplen > 0; ) {
        skiplen -= bin_.skip(skiplen); // Skip succeeds as we have read
this buffer
      }

      pos_ += endPos;
      if (outBufOrNull != null) {
        outBufOrNull.writeBytes(sbuf.substring(0,endPos));
      }
    }
    return matched;
  }

  // states
  final static int CDATA_IN = 10;
  final static int CDATA_OUT = 11;
  final static int CDATA_UNK = 12;
  final static int RECORD_ACCEPT = 13;
  // inputs
  final static int CDATA_BEGIN = 20;
  final static int CDATA_END = 21;
  final static int RECORD_MAYBE = 22;

  /* also updates firstMatchStart_;*/
  int nextState(int state, int input, int bufPos) {
    switch (state) {
    case CDATA_UNK:
    case CDATA_OUT:
      switch (input) {
      case CDATA_BEGIN:
        return CDATA_IN;
      case CDATA_END:
        if (state == CDATA_OUT) {
          //System.out.println("buggy XML " + bufPos);
        }
        return CDATA_OUT;
      case RECORD_MAYBE:
        return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
      }
      break;
    case CDATA_IN:
      return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
    }
    throw new IllegalStateException(state + " " + input + " " + bufPos + "
" + splitName_);
  }

  Pattern makePatternCDataOrMark(String escapedMark) {
    StringBuffer pat = new StringBuffer();
    addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
    addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
    addGroup(pat, escapedMark); // RECORD_MAYBE
    return Pattern.compile(pat.toString());
  }

  void addGroup(StringBuffer pat, String escapedGroup) {
    if (pat.length() > 0) {
      pat.append("|");
    }
    pat.append("(");
    pat.append(escapedGroup);
    pat.append(")");
  }

  boolean fastReadUntilMatch(String textPat, boolean includePat,
DataOutputBuffer outBufOrNull) throws IOException {
    byte[] cpat = textPat.getBytes("UTF-8");
    int m = 0;
    boolean match = false;
    int msup = cpat.length;
    int LL = 120000 * 10;

    bin_.mark(LL); // large number to invalidate mark
    while (true) {
      int b = bin_.read();
      if (b == -1) break;

      byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
      if (c == cpat[m]) {
        m++;
        if (m == msup) {
          match = true;
          break;
        }
      } else {
        bin_.mark(LL); // rest mark so we could jump back if we found a
match
        if (outBufOrNull != null) {
          outBufOrNull.write(cpat, 0, m);
          outBufOrNull.write(c);
        }
        pos_ += m + 1; // skip m chars, +1 for 'c'
        m = 0;
      }
    }
    if (!includePat && match) {
      bin_.reset();
    } else if (outBufOrNull != null) {
      outBufOrNull.write(cpat);
      pos_ += msup;
    }
    return match;
  }

  String checkJobGet(String prop) throws IOException {
    String val = conf_.get(prop);
    if (val == null) {
      throw new IOException("JobConf: missing required property: " + prop);
    }
    return val;
  }

  String beginMark_;
  String endMark_;

  Pattern beginPat_;
  Pattern endPat_;

  boolean slowMatch_;
  int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should
be more than max record size
  int maxRecSize_;

  BufferedInputStream bin_; // Wrap FSDataInputStream for efficient
backward seeks
  long pos_; // Keep track on position with respect encapsulated
FSDataInputStream

  final static int NA = -1;
  int firstMatchStart_ = 0; // candidate record boundary. Might just be
CDATA.
  int firstMatchEnd_ = 0;

  boolean synched_;
}

StreamBaseRecordReader

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.streaming;

import java.io.*;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.commons.logging.*;

/**
 * Shared functionality for hadoopStreaming formats.
 * A custom reader can be defined to be a RecordReader with the constructor
below
 * and is selected with the option bin/hadoopStreaming -inputreader ...
 * @see StreamXmlRecordReader
 */
public abstract class StreamBaseRecordReader implements RecordReader<Text,
Text> {

  protected static final Log LOG =
LogFactory.getLog(StreamBaseRecordReader.class.getName());

  // custom JobConf properties for this class are prefixed with this
namespace
  final static String CONF_NS = "stream.recordreader.";

  public StreamBaseRecordReader(FSDataInputStream in, FileSplit split,
TaskAttemptContext context,
                                Configuration conf, FileSystem fs) throws
IOException {
    in_ = in;
    split_ = split;
    start_ = split_.getStart();
    length_ = split_.getLength();
    end_ = start_ + length_;
    splitName_ = split_.getPath().getName();
    this.context_ = context;
    conf_ = conf;
    fs_ = fs;

    statusMaxRecordChars_ = conf.getInt(CONF_NS + "statuschars", 200);
  }

  /// RecordReader API

  /** Read a record. Implementation should call numRecStats at the end
   */
  public abstract boolean next(Text key, Text value) throws IOException;

  /** Returns the current position in the input. */
  public synchronized long getPos() throws IOException {
    return in_.getPos();
  }

  /** Close this to future operations.*/
  public synchronized void close() throws IOException {
    in_.close();
  }

  public float getProgress() throws IOException {
    if (end_ == start_) {
      return 1.0f;
    } else {
      return ((float)(in_.getPos() - start_)) / ((float)(end_ - start_));
    }
  }

  public Text createKey() {
    return new Text();
  }

  public Text createValue() {
    return new Text();
  }

  /// StreamBaseRecordReader API

  /** Implementation should seek forward in_ to the first byte of the next
record.
   *  The initial byte offset in the stream is arbitrary.
   */
  public abstract void seekNextRecordBoundary() throws IOException;

  void numRecStats(byte[] record, int start, int len) throws IOException {
    numRec_++;
    if (numRec_ == nextStatusRec_) {
      String recordStr = new String(record, start, Math.min(len,
statusMaxRecordChars_), "UTF-8");
      nextStatusRec_ += 100;//*= 10;
      String status = getStatus(recordStr);
      LOG.info(status);
      context_.setStatus(status);
    }
  }

  long lastMem = 0;

  String getStatus(CharSequence record) {
    long pos = -1;
    try {
      pos = getPos();
    } catch (IOException io) {
    }
    String recStr;
    if (record.length() > statusMaxRecordChars_) {
      recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
    } else {
      recStr = record.toString();
    }
    String unqualSplit = split_.getPath().getName() + ":" +
                         split_.getStart() + "+" + split_.getLength();
    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" +
pos + " " + unqualSplit
      + " Processing record=" + recStr;
    status += " " + splitName_;
    return status;
  }

  FSDataInputStream in_;
  FileSplit split_;
  long start_;
  long end_;
  long length_;
  String splitName_;
  TaskAttemptContext context_;
  Configuration conf_;
  FileSystem fs_;
  int numRec_ = 0;
  int nextStatusRec_ = 1;
  int statusMaxRecordChars_;

}



On Tue, Jun 19, 2012 at 5:58 PM, Mohammad Tariq <dontariq@gmail.com> wrote:

> But I have downloaded "hadoop-streaming-0.20.205.0.jar" and it
> contains StreamXmlRecordReader.class file. This means it should
> support StreamInputFormat.
>
> Regards,
>     Mohammad Tariq
>
>
> On Tue, Jun 19, 2012 at 5:54 PM, Mohammad Tariq <dontariq@gmail.com>
> wrote:
> > Thanks Madhu. I'll do that.
> >
> > Regards,
> >     Mohammad Tariq
> >
> >
> > On Tue, Jun 19, 2012 at 5:43 PM, madhu phatak <phatak.dev@gmail.com>
> wrote:
> >> Seems like StreamInputFormat not yet ported to new API.That's why you
> are
> >> not able to set as InputFormatClass. You can file a  jira for this
> issue.
> >>
> >>
> >> On Tue, Jun 19, 2012 at 4:49 PM, Mohammad Tariq <dontariq@gmail.com>
> wrote:
> >>>
> >>> My driver function looks like this -
> >>>
> >>> public static void main(String[] args) throws IOException,
> >>> InterruptedException, ClassNotFoundException {
> >>>                // TODO Auto-generated method stub
> >>>
> >>>                Configuration conf = new Configuration();
> >>>                Job job = new Job();
> >>>                conf.set("stream.recordreader.class",
> >>> "org.apache.hadoop.streaming.StreamXmlRecordReader");
> >>>                conf.set("stream.recordreader.begin", "<info>");
> >>>                conf.set("stream.recordreader.end", "</info>");
> >>>                job.setInputFormatClass(StreamInputFormat.class);
> >>>                job.setOutputKeyClass(Text.class);
> >>>                job.setOutputValueClass(IntWritable.class);
> >>>                FileInputFormat.addInputPath(job, new
> >>> Path("/mapin/demo.xml"));
> >>>                FileOutputFormat.setOutputPath(job, new
> >>> Path("/mapout/demo"));
> >>>                job.waitForCompletion(true);
> >>>        }
> >>>
> >>> Could you please out my mistake??
> >>>
> >>> Regards,
> >>>     Mohammad Tariq
> >>>
> >>>
> >>> On Tue, Jun 19, 2012 at 4:35 PM, Mohammad Tariq <dontariq@gmail.com>
> >>> wrote:
> >>> > Hello Madhu,
> >>> >
> >>> >             Thanks for the response. Actually I was trying to use the
> >>> > new API (Job). Have you tried that. I was not able to set the
> >>> > InputFormat using the Job API.
> >>> >
> >>> > Regards,
> >>> >     Mohammad Tariq
> >>> >
> >>> >
> >>> > On Tue, Jun 19, 2012 at 4:28 PM, madhu phatak <phatak.dev@gmail.com>
> >>> > wrote:
> >>> >> Hi,
> >>> >>  Set the following properties in driver class
> >>> >>
> >>> >>   jobConf.set("stream.recordreader.class",
> >>> >> "org.apache.hadoop.streaming.StreamXmlRecordReader");
> >>> >> jobConf.set("stream.recordreader.begin",
> >>> >> "start-tag");
> >>> >> jobConf.set("stream.recordreader.end",
> >>> >> "end-tag");
> >>> >>
> >>> >> jobConf.setInputFormat(StreamInputFormat,class);
> >>> >>
> >>> >>  In Mapper, xml record will come as key of type Text,so your mapper
> >>> >> will
> >>> >> look like
> >>> >>
> >>> >>   public class MyMapper<K,V>  implements Mapper<Text,Text,K,V>
> >>> >>
> >>> >>
> >>> >> On Tue, Jun 19, 2012 at 2:49 AM, Mohammad Tariq <dontariq@gmail.com
> >
> >>> >> wrote:
> >>> >>>
> >>> >>> Hello list,
> >>> >>>
> >>> >>>        Could anyone, who has written MapReduce jobs to process
xml
> >>> >>> documents stored in there cluster using "StreamXmlRecordReader"
> share
> >>> >>> his/her experience??...or if you can provide me some pointers
> >>> >>> addressing that..Many thanks.
> >>> >>>
> >>> >>> Regards,
> >>> >>>     Mohammad Tariq
> >>> >>
> >>> >>
> >>> >>
> >>> >>
> >>> >> --
> >>> >> https://github.com/zinnia-phatak-dev/Nectar
> >>> >>
> >>
> >>
> >>
> >>
> >> --
> >> https://github.com/zinnia-phatak-dev/Nectar
> >>
>



-- 
https://github.com/zinnia-phatak-dev/Nectar

Mime
View raw message