hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r886004 - in /hadoop/common/branches/branch-0.21: CHANGES.txt src/java/org/apache/hadoop/io/SequenceFile.java src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
Date Tue, 01 Dec 2009 23:26:16 GMT
Author: szetszwo
Date: Tue Dec  1 23:26:16 2009
New Revision: 886004

URL: http://svn.apache.org/viewvc?rev=886004&view=rev
Log:
HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support reading on un-closed
file.

Modified:
    hadoop/common/branches/branch-0.21/CHANGES.txt
    hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java

Modified: hadoop/common/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.21/CHANGES.txt?rev=886004&r1=886003&r2=886004&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.21/CHANGES.txt Tue Dec  1 23:26:16 2009
@@ -636,6 +636,9 @@
     HADOOP-6271. Add recursive and non recursive create and mkdir to 
     FileContext. (Sanjay Radia via suresh)
 
+    HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support
+    reading on un-closed file.  (szetszwo)
+
   BUG FIXES
     
     HADOOP-5379. CBZip2InputStream to throw IOException on data crc error.

Modified: hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java?rev=886004&r1=886003&r2=886004&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java Tue
Dec  1 23:26:16 2009
@@ -1435,32 +1435,71 @@
     private DeserializerBase keyDeserializer;
     private DeserializerBase valDeserializer;
 
-    /** Open the named file. */
+    /**
+     * Construct a reader by opening a file from the given file system.
+     * @param fs The file system used to open the file.
+     * @param file The file being read.
+     * @param conf Configuration
+     * @throws IOException
+     */
     public Reader(FileSystem fs, Path file, Configuration conf)
       throws IOException {
       this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
     }
 
+    /**
+     * Construct a reader by the given input stream.
+     * @param in An input stream.
+     * @param buffersize The buffer size used to read the file.
+     * @param start The starting position.
+     * @param length The length being read.
+     * @param conf Configuration
+     * @throws IOException
+     */
+    public Reader(FSDataInputStream in, int buffersize,
+        long start, long length, Configuration conf) throws IOException {
+      this(null, null, in, buffersize, start, length, conf, false);
+    }
+
     private Reader(FileSystem fs, Path file, int bufferSize,
                    Configuration conf, boolean tempReader) throws IOException {
-      this(fs, file, bufferSize, 0, fs.getFileStatus(file).getLen(), conf, tempReader);
+      this(fs, file, null, bufferSize, 0, fs.getFileStatus(file).getLen(),
+          conf, tempReader);
     }
-    
-    private Reader(FileSystem fs, Path file, int bufferSize, long start,
-                   long length, Configuration conf, boolean tempReader) 
-    throws IOException {
+
+    /**
+     * Private constructor.
+     * @param fs The file system used to open the file.
+     *           It is not used if the given input stream is not null.  
+     * @param file The file being read.
+     * @param in An input stream of the file.  If it is null,
+     *           the file will be opened from the given file system.
+     * @param bufferSize The buffer size used to read the file.
+     * @param start The starting position.
+     * @param length The length being read.
+     * @param conf Configuration
+     * @param tempReader Is this temporary? 
+     * @throws IOException
+     */
+    private Reader(FileSystem fs, Path file, FSDataInputStream in,
+        int bufferSize, long start, long length, Configuration conf,
+        boolean tempReader) throws IOException {
+      if (fs == null && in == null) {
+        throw new IllegalArgumentException("fs == null && in == null");
+      }
+
       this.file = file;
-      this.in = openFile(fs, file, bufferSize, length);
+      this.in = in != null? in: openFile(fs, file, bufferSize, length);
       this.conf = conf;
       boolean succeeded = false;
       try {
         seek(start);
-        this.end = in.getPos() + length;
+        this.end = this.in.getPos() + length;
         init(tempReader);
         succeeded = true;
       } finally {
         if (!succeeded) {
-          IOUtils.cleanup(LOG, in);
+          IOUtils.cleanup(LOG, this.in);
         }
       }
     }
@@ -1468,6 +1507,13 @@
     /**
      * Override this method to specialize the type of
      * {@link FSDataInputStream} returned.
+     * @param fs The file system used to open the file.
+     * @param file The file being read.
+     * @param bufferSize The buffer size used to read the file.
+     * @param length The length being read if it is >= 0.  Otherwise,
+     *               the length is not available.
+     * @return The opened stream.
+     * @throws IOException
      */
     protected FSDataInputStream openFile(FileSystem fs, Path file,
         int bufferSize, long length) throws IOException {
@@ -1489,7 +1535,7 @@
       if ((versionBlock[0] != VERSION[0]) ||
           (versionBlock[1] != VERSION[1]) ||
           (versionBlock[2] != VERSION[2]))
-        throw new IOException(file + " not a SequenceFile");
+        throw new IOException(this + " not a SequenceFile");
 
       // Set 'version'
       version = versionBlock[3];
@@ -2251,7 +2297,7 @@
 
     /** Returns the name of the file. */
     public String toString() {
-      return file.toString();
+      return file == null? "<unknown>": file.toString();
     }
 
   }
@@ -3132,7 +3178,7 @@
           if (fs.getUri().getScheme().startsWith("ramfs")) {
             bufferSize = conf.getInt("io.bytes.per.checksum", 512);
           }
-          Reader reader = new Reader(fs, segmentPathName, 
+          Reader reader = new Reader(fs, segmentPathName, null, 
                                      bufferSize, segmentOffset, 
                                      segmentLength, conf, false);
         

Modified: hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java?rev=886004&r1=886003&r2=886004&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
(original)
+++ hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
Tue Dec  1 23:26:16 2009
@@ -18,22 +18,17 @@
 
 package org.apache.hadoop.io;
 
-import java.io.File;
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.util.Random;
-import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import static org.junit.Assert.*;
-
 public class TestSequenceFileSync {
   private static final int NUMRECORDS = 2000;
   private static final int RECORDSIZE = 80;
@@ -66,8 +61,18 @@
     try {
       writeSequenceFile(writer, NUMRECORDS);
       for (int i = 0; i < 5 ; i++) {
-       final SequenceFile.Reader reader =
-         new SequenceFile.Reader(fs, path, conf);
+       final SequenceFile.Reader reader;
+       
+       //try different SequenceFile.Reader constructors
+       if (i % 2 == 0) {
+         reader = new SequenceFile.Reader(fs, path, conf);
+       } else {
+         final FSDataInputStream in = fs.open(path);
+         final long length = fs.getFileStatus(path).getLen();
+         final int buffersize = conf.getInt("io.file.buffer.size", 4096);
+         reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
+       }
+
        try {
           forOffset(reader, input, val, i, 0, 0);
           forOffset(reader, input, val, i, 65, 0);



Mime
View raw message