hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r592724 - in /lucene/hadoop/trunk: CHANGES.txt src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
Date Wed, 07 Nov 2007 12:08:02 GMT
Author: acmurthy
Date: Wed Nov  7 04:08:01 2007
New Revision: 592724

URL: http://svn.apache.org/viewvc?rev=592724&view=rev
Log:
HADOOP-2071.  Fix StreamXmlRecordReader to use a BufferedInputStream wrapped over the DFSInputStream
since mark/reset aren't supported by DFSInputStream anymore. Contributed by Lohit Vijayarenu.

Added:
    lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=592724&r1=592723&r2=592724&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Nov  7 04:08:01 2007
@@ -72,6 +72,10 @@
     HADOOP-2089.  Fixes the command line argument handling to handle multiple
     -cacheArchive in Hadoop streaming.  (Lohit Vijayarenu via ddas)
 
+    HADOOP-2071.  Fix StreamXmlRecordReader to use a BufferedInputStream
+    wrapped over the DFSInputStream since mark/reset aren't supported by
+    DFSInputStream anymore. (Lohit Vijayarenu via acmurthy)
+
 
 Release 0.15.1 -
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=592724&r1=592723&r2=592724&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
(original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
Wed Nov  7 04:08:01 2007
@@ -71,15 +71,16 @@
     if (start_ > in_.getPos()) {
       in_.seek(start_);
     }
+    pos_ = start_;
+    bin_ = new BufferedInputStream(in_);
     seekNextRecordBoundary();
   }
   
   int numNext = 0;
 
   public synchronized boolean next(Text key, Text value) throws IOException {
-    long pos = in_.getPos();
     numNext++;
-    if (pos >= end_) {
+    if (pos_ >= end_) {
       return false;
     }
 
@@ -100,11 +101,6 @@
     key.set(record);
     value.set("");
 
-    /*if (numNext < 5) {
-      System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]",
" ")
-      + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ")
+ "|");
-      }*/
-
     return true;
   }
 
@@ -130,77 +126,64 @@
 
   private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
                                      DataOutputBuffer outBufOrNull) throws IOException {
-    try {
-      long inStart = in_.getPos();
-      byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
-      int read = 0;
-      boolean success = true;
-      in_.mark(lookAhead_ + 2);
-      read = in_.read(buf);
-      if (read == -1) return false;
-
-      String sbuf = new String(buf, 0, read, "UTF-8");
-      Matcher match = markPattern.matcher(sbuf);
-
-      firstMatchStart_ = NA;
-      firstMatchEnd_ = NA;
-      int bufPos = 0;
-      int state = synched_ ? CDATA_OUT : CDATA_UNK;
-      int s = 0;
-      int matchLen = 0;
-      while (match.find(bufPos)) {
-        int input;
-        matchLen = match.group(0).length();
-        if (match.group(1) != null) {
-          input = CDATA_BEGIN;
-        } else if (match.group(2) != null) {
-          input = CDATA_END;
-          firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
-        } else {
-          input = RECORD_MAYBE;
-        }
-        if (input == RECORD_MAYBE) {
-          if (firstMatchStart_ == NA) {
-            firstMatchStart_ = match.start();
-            firstMatchEnd_ = match.end();
-          }
-        }
-        state = nextState(state, input, match.start());
-        /*System.out.println("@@@" +
-          s + ". Match " + match.start() + " " + match.groupCount() +
-          " state=" + state + " input=" + input + 
-          " firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_)
+ 
-          " match=" + match.group(0) + " in=" + in_.getPos());*/
-        if (state == RECORD_ACCEPT) {
-          break;
+    byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
+    int read = 0;
+    boolean success = true;
+    long skippedBytes = 0;
+    bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); //mark to invalidate if we read more
+    read = bin_.read(buf);
+    if (read == -1) return false;
+
+    String sbuf = new String(buf, 0, read, "UTF-8");
+    Matcher match = markPattern.matcher(sbuf);
+
+    firstMatchStart_ = NA;
+    firstMatchEnd_ = NA;
+    int bufPos = 0;
+    int state = synched_ ? CDATA_OUT : CDATA_UNK;
+    int s = 0;
+    int matchLen = 0;
+    int LL = 120000 * 10;
+
+    while (match.find(bufPos)) {
+      int input;
+      matchLen = match.group(0).length();
+      if (match.group(1) != null) {
+        input = CDATA_BEGIN;
+      } else if (match.group(2) != null) {
+        input = CDATA_END;
+        firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
+      } else {
+        input = RECORD_MAYBE;
+      }
+      if (input == RECORD_MAYBE) {
+        if (firstMatchStart_ == NA) {
+          firstMatchStart_ = match.start();
+          firstMatchEnd_ = match.end();
         }
-        bufPos = match.end();
-        s++;
       }
-      if (state != CDATA_UNK) {
-        synched_ = true;
+      state = nextState(state, input, match.start());
+      if (state == RECORD_ACCEPT) {
+        bufPos = match.end();
+        break;
       }
-      boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state
== CDATA_UNK);
-      if (matched) {
-        int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
-        //System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_="
+ firstMatchEnd_);
-        //String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
-        //System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
-        if (outBufOrNull != null) {
-          in_.reset();
-          outBufOrNull.write(in_, endPos);
-        } else {
-          //System.out.println("Skip to " + (inStart + endPos));
-          in_.seek(inStart + endPos);
-        }
+      bufPos = match.end();
+      s++;
+    }
+    if (state != CDATA_UNK) {
+      synched_ = true;
+    }
+    boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state
== CDATA_UNK);
+    if (matched) {
+      int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
+      bin_.reset();
+      skippedBytes = bin_.skip(endPos); //Skip succeeds as we have already read this is buffer
+      pos_ += endPos;
+      if (outBufOrNull != null) {
+        outBufOrNull.writeBytes(sbuf.substring(0,endPos));
       }
-      return matched;
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      // in_ ?
     }
-    return false;
+    return matched;
   }
 
   // states
@@ -254,19 +237,15 @@
   }
 
   boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull)
throws IOException {
-    //System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());  
     byte[] cpat = textPat.getBytes("UTF-8");
     int m = 0;
     boolean match = false;
-    long markPos = -1;
     int msup = cpat.length;
-    if (!includePat) {
-      int LL = 120000 * 10;
-      markPos = in_.getPos();
-      in_.mark(LL); // lookAhead_
-    }
+    int LL = 120000 * 10;
+
+    bin_.mark(LL); // large number to invalidate mark
     while (true) {
-      int b = in_.read();
+      int b = bin_.read();
       if (b == -1) break;
 
       byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
@@ -277,22 +256,21 @@
           break;
         }
       } else {
+        bin_.mark(LL); // rest mark so we could jump back if we found a match
         if (outBufOrNull != null) {
           outBufOrNull.write(cpat, 0, m);
           outBufOrNull.write(c);
+          pos_ += m;
         }
-
         m = 0;
       }
     }
     if (!includePat && match) {
-      long pos = in_.getPos() - textPat.length();
-      in_.reset();
-      in_.seek(pos);
+      bin_.reset();
     } else if (outBufOrNull != null) {
       outBufOrNull.write(cpat);
+      pos_ += msup;
     }
-    //System.out.println("@@@DONE  readUntilMatch inPos=" + in_.getPos() + " includePat="
+ includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
     return match;
   }
 
@@ -313,6 +291,9 @@
   boolean slowMatch_;
   int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max
record size
   int maxRecSize_;
+
+  BufferedInputStream bin_; // Wrap FSDataInputStream for efficient backward seeks 
+  long pos_; // Keep track on position with respect encapsulated FSDataInputStream  
 
   final static int NA = -1;
   int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java?rev=592724&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
(added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
Wed Nov  7 04:08:01 2007
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/**
+ * This class tests StreamXmlRecordReader
+ * The test creates an XML file, uses StreamXmlRecordReader and compares
+ * the expected output against the generated output
+ */
+public class TestStreamXmlRecordReader extends TestStreaming
+{
+
+  private StreamJob job;
+
+  public TestStreamXmlRecordReader() throws IOException {
+    INPUT_FILE = new File("input.xml");
+    input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\nbunnies.are.pink\t\n</xmltag>\t\n";
+  }
+  
+  protected void createInput() throws IOException
+  {
+    FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
+    String dummyXmlStartTag = "<PATTERN>\n";
+    String dummyXmlEndTag = "</PATTERN>\n";
+    out.write(dummyXmlStartTag.getBytes("UTF-8"));
+    out.write(input.getBytes("UTF-8"));
+    out.write(dummyXmlEndTag.getBytes("UTF-8"));
+    out.close();
+  }
+
+  protected String[] genArgs() {
+    return new String[] {
+      "-input", INPUT_FILE.getAbsolutePath(),
+      "-output", OUTPUT_DIR.getAbsolutePath(),
+      "-mapper","cat", 
+      "-reducer", "NONE", 
+      "-inputreader", "StreamXmlRecordReader,begin=<xmltag>,end=</xmltag>"
+    };
+  }
+
+  public void testCommandLine() {
+    try {
+      try {
+        OUTPUT_DIR.getAbsoluteFile().delete();
+      } catch (Exception e) {
+      }
+      createInput();
+      job = new StreamJob(genArgs(), false);
+      job.go();
+      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      outFile.delete();
+      assertEquals(input, output);
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      INPUT_FILE.delete();
+      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
+      outFileCRC.delete();
+      OUTPUT_DIR.getAbsoluteFile().delete();
+    }
+  }
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreamXmlRecordReader().testCommandLine();
+  }
+}



Mime
View raw message