hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1470661 - in /hadoop/common/branches/branch-1.2: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/web/
Date Mon, 22 Apr 2013 18:59:44 GMT
Author: szetszwo
Date: Mon Apr 22 18:59:43 2013
New Revision: 1470661

URL: http://svn.apache.org/r1470661
Log:
svn merge -c 1470660 from branch-1 for HDFS-4715. Backport HDFS-3577, HDFS-3318 and HDFS-3788:
fix some WebHDFS performance issues.

Added:
    hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
      - copied unchanged from r1470660, hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
Modified:
    hadoop/common/branches/branch-1.2/   (props changed)
    hadoop/common/branches/branch-1.2/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java

Propchange: hadoop/common/branches/branch-1.2/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1:r1470660

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1470661&r1=1470660&r2=1470661&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Mon Apr 22 18:59:43 2013
@@ -568,6 +568,9 @@ Release 1.2.0 - 2013.04.16
     MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
     Mitic via acmurthy)
 
+    HDFS-4715. Backport HDFS-3577, HDFS-3318 and HDFS-3788: fix some WebHDFS
+    performance issues.  (Mark Wagner via szetszwo)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/branch-1.2/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1470660

Modified: hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1470661&r1=1470660&r2=1470661&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
(original)
+++ hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
Mon Apr 22 18:59:43 2013
@@ -22,7 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
 
+import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
@@ -60,14 +64,14 @@ public abstract class ByteRangeInputStre
   }
 
   enum StreamStatus {
-    NORMAL, SEEK
+    NORMAL, SEEK, CLOSED
   }
   protected InputStream in;
   protected URLOpener originalURL;
   protected URLOpener resolvedURL;
   protected long startPos = 0;
   protected long currentPos = 0;
-  protected long filelength;
+  protected Long fileLength = null;
 
   StreamStatus status = StreamStatus.SEEK;
 
@@ -85,63 +89,120 @@ public abstract class ByteRangeInputStre
   protected abstract URL getResolvedUrl(final HttpURLConnection connection
       ) throws IOException;
 
-  private InputStream getInputStream() throws IOException {
-    if (status != StreamStatus.NORMAL) {
-      
-      if (in != null) {
-        in.close();
-        in = null;
-      }
-      
-      // Use the original url if no resolved url exists, eg. if
-      // it's the first time a request is made.
-      final boolean resolved = resolvedURL.getURL() != null; 
-      final URLOpener opener = resolved? resolvedURL: originalURL;
-
-      final HttpURLConnection connection = opener.connect(startPos, resolved);
+  protected InputStream getInputStream() throws IOException {
+    switch (status) {
+      case NORMAL:
+        break;
+      case SEEK:
+        if (in != null) {
+          in.close();
+        }
+        in = openInputStream();
+        status = StreamStatus.NORMAL;
+        break;
+      case CLOSED:
+        throw new IOException("Stream closed");
+    }
+    return in;
+  }
+  
+  protected InputStream openInputStream() throws IOException {
+    // Use the original url if no resolved url exists, eg. if
+    // it's the first time a request is made.
+    final boolean resolved = resolvedURL.getURL() != null; 
+    final URLOpener opener = resolved? resolvedURL: originalURL;
+
+    final HttpURLConnection connection = opener.connect(startPos, resolved);
+    resolvedURL.setURL(getResolvedUrl(connection));
+
+    InputStream in = connection.getInputStream();
+    final Map<String, List<String>> headers = connection.getHeaderFields();
+    if (isChunkedTransferEncoding(headers)) {
+      // file length is not known
+      fileLength = null;
+    } else {
+      // for non-chunked transfer-encoding, get content-length
       final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-      filelength = (cl == null) ? -1 : Long.parseLong(cl);
-      in = connection.getInputStream();
+      if (cl == null) {
+        throw new IOException(StreamFile.CONTENT_LENGTH + " is missing: "
+            + headers);
+      }
+      final long streamlength = Long.parseLong(cl);
+      fileLength = startPos + streamlength;
 
-      resolvedURL.setURL(getResolvedUrl(connection));
-      status = StreamStatus.NORMAL;
+      // Java has a bug with >2GB request streams.  It won't bounds check
+      // the reads so the transfer blocks until the server times out
+      in = new BoundedInputStream(in, streamlength);
     }
-    
+
     return in;
   }
   
-  private void update(final boolean isEOF, final int n)
-      throws IOException {
-    if (!isEOF) {
+  private static boolean isChunkedTransferEncoding(
+      final Map<String, List<String>> headers) {
+    return contains(headers, "Transfer-Encoding", "chunked")
+        || contains(headers, "TE", "chunked");
+  }
+
+  /** Does the HTTP header map contain the given key, value pair? */
+  private static boolean contains(final Map<String, List<String>> headers,
+      final String key, final String value) {
+    final List<String> values = headers.get(key);
+    if (values != null) {
+      for(String v : values) {
+        for(final StringTokenizer t = new StringTokenizer(v, ",");
+            t.hasMoreTokens(); ) {
+          if (value.equalsIgnoreCase(t.nextToken())) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+  
+  private int update(final int n) throws IOException {
+    if (n != -1) {
       currentPos += n;
-    } else if (currentPos < filelength) {
+    } else if (fileLength != null && currentPos < fileLength) {
       throw new IOException("Got EOF but currentPos = " + currentPos
-          + " < filelength = " + filelength);
+          + " < filelength = " + fileLength);
     }
+    return n;
   }
 
+  @Override
   public int read() throws IOException {
     final int b = getInputStream().read();
-    update(b == -1, 1);
+    update((b == -1)? -1 : 1);
     return b;
   }
   
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException{
+    return update(getInputStream().read(b, off, len));
+  }
+  
   /**
    * Seek to the given offset from the start of the file.
    * The next read() will be from that location.  Can't
    * seek past the end of the file.
    */
+  @Override
   public void seek(long pos) throws IOException {
     if (pos != currentPos) {
       startPos = pos;
       currentPos = pos;
-      status = StreamStatus.SEEK;
+      if (status != StreamStatus.CLOSED) {
+        status = StreamStatus.SEEK;
+      }
     }
   }
 
   /**
    * Return the current offset from the start of the file
    */
+  @Override
   public long getPos() throws IOException {
     return currentPos;
   }
@@ -150,7 +211,17 @@ public abstract class ByteRangeInputStre
    * Seeks a different copy of the data.  Returns true if
    * found a new source, false otherwise.
    */
+  @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
-}
\ No newline at end of file
+  
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+    status = StreamStatus.CLOSED;
+  }
+}

Modified: hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1470661&r1=1470660&r2=1470661&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
(original)
+++ hadoop/common/branches/branch-1.2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
Mon Apr 22 18:59:43 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,7 +38,6 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -341,31 +339,10 @@ public class DatanodeWebHdfsMethods {
         IOUtils.cleanup(LOG, dfsclient);
         throw ioe;
       }
-      final DFSDataInputStream dis = in;
-      final StreamingOutput streaming = new StreamingOutput() {
-        @Override
-        public void write(final OutputStream out) throws IOException {
-          final Long n = length.getValue();
-          DFSDataInputStream dfsin = dis;
-          DFSClient client = dfsclient;
-          try {
-            if (n == null) {
-              IOUtils.copyBytes(dfsin, out, 4096);
-            } else {
-              IOUtils.copyBytes(dfsin, out, n, 4096, false);
-            }
-            dfsin.close();
-            dfsin = null;
-            client.close();
-            client = null;
-          } finally {
-            IOUtils.cleanup(LOG, dfsin);
-            IOUtils.cleanup(LOG, client);
-          }
-        }
-      };
-
-      return Response.ok(streaming).type(
+      
+      final long n = length.getValue() != null? length.getValue()
+          : in.getVisibleLength();
+      return Response.ok(new OpenEntity(in, n, dfsclient)).type(
           MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:

Modified: hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1470661&r1=1470660&r2=1470661&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
(original)
+++ hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
Mon Apr 22 18:59:43 2013
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 
+import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.junit.Test;
 
 public class TestByteRangeInputStream {
@@ -76,6 +77,11 @@ public static class MockHttpURLConnectio
   public void setResponseCode(int resCode) {
     responseCode = resCode;
   }
+  
+  @Override
+  public String getHeaderField(String field) {
+      return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
+  }
 }
 
   @Test

Modified: hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1470661&r1=1470660&r2=1470661&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
(original)
+++ hadoop/common/branches/branch-1.2/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
Mon Apr 22 18:59:43 2013
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Map;
+import java.util.Random;
 
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
@@ -212,15 +213,20 @@ public class TestWebHdfsFileSystemContra
       assertEquals(0, count);
     }
 
+    final byte[] mydata = new byte[1 << 20];
+    new Random().nextBytes(mydata);
+    
     final Path p = new Path(dir, "file");
-    createFile(p);
+    FSDataOutputStream out = fs.create(p, false, 4096, (short)3, 1L << 17);
+    out.write(mydata);
+    out.close();
 
-    final int one_third = data.length/3;
+    final int one_third = mydata.length/3;
     final int two_third = one_third*2;
 
     { //test seek
       final int offset = one_third; 
-      final int len = data.length - offset;
+      final int len = mydata.length - offset;
       final byte[] buf = new byte[len];
 
       final FSDataInputStream in = fs.open(p);
@@ -232,13 +238,13 @@ public class TestWebHdfsFileSystemContra
   
       for (int i = 0; i < buf.length; i++) {
         assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
-            data[i + offset], buf[i]);
+            mydata[i + offset], buf[i]);
       }
     }
 
     { //test position read (read the data after the two_third location)
       final int offset = two_third; 
-      final int len = data.length - offset;
+      final int len = mydata.length - offset;
       final byte[] buf = new byte[len];
 
       final FSDataInputStream in = fs.open(p);
@@ -247,7 +253,7 @@ public class TestWebHdfsFileSystemContra
   
       for (int i = 0; i < buf.length; i++) {
         assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
-            data[i + offset], buf[i]);
+            mydata[i + offset], buf[i]);
       }
     }
   }



Mime
View raw message