hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1470660 - in /hadoop/common/branches/branch-1: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/web/
Date Mon, 22 Apr 2013 18:58:53 GMT
Author: szetszwo
Date: Mon Apr 22 18:58:52 2013
New Revision: 1470660

URL: http://svn.apache.org/r1470660
Log:
HDFS-4715. Backport HDFS-3577, HDFS-3318 and HDFS-3788: fix some WebHDFS performance issues.
 Contributed by Mark Wagner

Added:
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1470660&r1=1470659&r2=1470660&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Apr 22 18:58:52 2013
@@ -611,6 +611,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-5066. Added a timeout for the job.end.notification.url. (Ivan
     Mitic via acmurthy)
 
+    HDFS-4715. Backport HDFS-3577, HDFS-3318 and HDFS-3788: fix some WebHDFS
+    performance issues.  (Mark Wagner via szetszwo)
+
 Release 1.1.2 - 2013.01.30
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1470660&r1=1470659&r2=1470660&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
(original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/ByteRangeInputStream.java
Mon Apr 22 18:58:52 2013
@@ -22,7 +22,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
 
+import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
@@ -60,14 +64,14 @@ public abstract class ByteRangeInputStre
   }
 
   enum StreamStatus {
-    NORMAL, SEEK
+    NORMAL, SEEK, CLOSED
   }
   protected InputStream in;
   protected URLOpener originalURL;
   protected URLOpener resolvedURL;
   protected long startPos = 0;
   protected long currentPos = 0;
-  protected long filelength;
+  protected Long fileLength = null;
 
   StreamStatus status = StreamStatus.SEEK;
 
@@ -85,63 +89,120 @@ public abstract class ByteRangeInputStre
   protected abstract URL getResolvedUrl(final HttpURLConnection connection
       ) throws IOException;
 
-  private InputStream getInputStream() throws IOException {
-    if (status != StreamStatus.NORMAL) {
-      
-      if (in != null) {
-        in.close();
-        in = null;
-      }
-      
-      // Use the original url if no resolved url exists, eg. if
-      // it's the first time a request is made.
-      final boolean resolved = resolvedURL.getURL() != null; 
-      final URLOpener opener = resolved? resolvedURL: originalURL;
-
-      final HttpURLConnection connection = opener.connect(startPos, resolved);
+  protected InputStream getInputStream() throws IOException {
+    switch (status) {
+      case NORMAL:
+        break;
+      case SEEK:
+        if (in != null) {
+          in.close();
+        }
+        in = openInputStream();
+        status = StreamStatus.NORMAL;
+        break;
+      case CLOSED:
+        throw new IOException("Stream closed");
+    }
+    return in;
+  }
+  
+  protected InputStream openInputStream() throws IOException {
+    // Use the original url if no resolved url exists, eg. if
+    // it's the first time a request is made.
+    final boolean resolved = resolvedURL.getURL() != null; 
+    final URLOpener opener = resolved? resolvedURL: originalURL;
+
+    final HttpURLConnection connection = opener.connect(startPos, resolved);
+    resolvedURL.setURL(getResolvedUrl(connection));
+
+    InputStream in = connection.getInputStream();
+    final Map<String, List<String>> headers = connection.getHeaderFields();
+    if (isChunkedTransferEncoding(headers)) {
+      // file length is not known
+      fileLength = null;
+    } else {
+      // for non-chunked transfer-encoding, get content-length
       final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-      filelength = (cl == null) ? -1 : Long.parseLong(cl);
-      in = connection.getInputStream();
+      if (cl == null) {
+        throw new IOException(StreamFile.CONTENT_LENGTH + " is missing: "
+            + headers);
+      }
+      final long streamlength = Long.parseLong(cl);
+      fileLength = startPos + streamlength;
 
-      resolvedURL.setURL(getResolvedUrl(connection));
-      status = StreamStatus.NORMAL;
+      // Java has a bug with >2GB request streams.  It won't bounds check
+      // the reads so the transfer blocks until the server times out
+      in = new BoundedInputStream(in, streamlength);
     }
-    
+
     return in;
   }
   
-  private void update(final boolean isEOF, final int n)
-      throws IOException {
-    if (!isEOF) {
+  private static boolean isChunkedTransferEncoding(
+      final Map<String, List<String>> headers) {
+    return contains(headers, "Transfer-Encoding", "chunked")
+        || contains(headers, "TE", "chunked");
+  }
+
+  /** Does the HTTP header map contain the given key, value pair? */
+  private static boolean contains(final Map<String, List<String>> headers,
+      final String key, final String value) {
+    final List<String> values = headers.get(key);
+    if (values != null) {
+      for(String v : values) {
+        for(final StringTokenizer t = new StringTokenizer(v, ",");
+            t.hasMoreTokens(); ) {
+          if (value.equalsIgnoreCase(t.nextToken())) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+  
+  private int update(final int n) throws IOException {
+    if (n != -1) {
       currentPos += n;
-    } else if (currentPos < filelength) {
+    } else if (fileLength != null && currentPos < fileLength) {
       throw new IOException("Got EOF but currentPos = " + currentPos
-          + " < filelength = " + filelength);
+          + " < filelength = " + fileLength);
     }
+    return n;
   }
 
+  @Override
   public int read() throws IOException {
     final int b = getInputStream().read();
-    update(b == -1, 1);
+    update((b == -1)? -1 : 1);
     return b;
   }
   
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException{
+    return update(getInputStream().read(b, off, len));
+  }
+  
   /**
    * Seek to the given offset from the start of the file.
    * The next read() will be from that location.  Can't
    * seek past the end of the file.
    */
+  @Override
   public void seek(long pos) throws IOException {
     if (pos != currentPos) {
       startPos = pos;
       currentPos = pos;
-      status = StreamStatus.SEEK;
+      if (status != StreamStatus.CLOSED) {
+        status = StreamStatus.SEEK;
+      }
     }
   }
 
   /**
    * Return the current offset from the start of the file
    */
+  @Override
   public long getPos() throws IOException {
     return currentPos;
   }
@@ -150,7 +211,17 @@ public abstract class ByteRangeInputStre
    * Seeks a different copy of the data.  Returns true if
    * found a new source, false otherwise.
    */
+  @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
-}
\ No newline at end of file
+  
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+    status = StreamStatus.CLOSED;
+  }
+}

Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1470660&r1=1470659&r2=1470660&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
(original)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
Mon Apr 22 18:58:52 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -39,7 +38,6 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -341,31 +339,10 @@ public class DatanodeWebHdfsMethods {
         IOUtils.cleanup(LOG, dfsclient);
         throw ioe;
       }
-      final DFSDataInputStream dis = in;
-      final StreamingOutput streaming = new StreamingOutput() {
-        @Override
-        public void write(final OutputStream out) throws IOException {
-          final Long n = length.getValue();
-          DFSDataInputStream dfsin = dis;
-          DFSClient client = dfsclient;
-          try {
-            if (n == null) {
-              IOUtils.copyBytes(dfsin, out, 4096);
-            } else {
-              IOUtils.copyBytes(dfsin, out, n, 4096, false);
-            }
-            dfsin.close();
-            dfsin = null;
-            client.close();
-            client = null;
-          } finally {
-            IOUtils.cleanup(LOG, dfsin);
-            IOUtils.cleanup(LOG, client);
-          }
-        }
-      };
-
-      return Response.ok(streaming).type(
+      
+      final long n = length.getValue() != null? length.getValue()
+          : in.getVisibleLength();
+      return Response.ok(new OpenEntity(in, n, dfsclient)).type(
           MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETFILECHECKSUM:

Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java?rev=1470660&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
(added)
+++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
Mon Apr 22 18:58:52 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web.resources;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A response entity for a DFSDataInputStream.
+ */
+public class OpenEntity {
+  private final DFSDataInputStream in;
+  private final long length;
+  private final DFSClient dfsclient;
+  
+  OpenEntity(final DFSDataInputStream in, final long length,
+      final DFSClient dfsclient) {
+    this.in = in;
+    this.length = length;
+    this.dfsclient = dfsclient;
+  }
+  
+  /**
+   * A {@link MessageBodyWriter} for {@link OpenEntity}.
+   */
+  @Provider
+  public static class Writer implements MessageBodyWriter<OpenEntity> {
+
+    @Override
+    public boolean isWriteable(Class<?> clazz, Type genericType,
+        Annotation[] annotations, MediaType mediaType) {
+      return clazz == OpenEntity.class
+          && MediaType.APPLICATION_OCTET_STREAM_TYPE.isCompatible(mediaType);
+    }
+
+    @Override
+    public long getSize(OpenEntity e, Class<?> type, Type genericType,
+        Annotation[] annotations, MediaType mediaType) {
+      return e.length;
+    }
+
+    @Override
+    public void writeTo(OpenEntity e, Class<?> type, Type genericType,
+        Annotation[] annotations, MediaType mediaType,
+        MultivaluedMap<String, Object> httpHeaders, OutputStream out
+        ) throws IOException {
+      try {
+        IOUtils.copyBytes(e.in, out, e.length, 4096, false);
+      } finally {
+        IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.in);
+        IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.dfsclient);
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1470660&r1=1470659&r2=1470660&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
Mon Apr 22 18:58:52 2013
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 
+import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.junit.Test;
 
 public class TestByteRangeInputStream {
@@ -76,6 +77,11 @@ public static class MockHttpURLConnectio
   public void setResponseCode(int resCode) {
     responseCode = resCode;
   }
+  
+  @Override
+  public String getHeaderField(String field) {
+      return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
+  }
 }
 
   @Test

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java?rev=1470660&r1=1470659&r2=1470660&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
(original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
Mon Apr 22 18:58:52 2013
@@ -25,6 +25,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Map;
+import java.util.Random;
 
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
@@ -212,15 +213,20 @@ public class TestWebHdfsFileSystemContra
       assertEquals(0, count);
     }
 
+    final byte[] mydata = new byte[1 << 20];
+    new Random().nextBytes(mydata);
+    
     final Path p = new Path(dir, "file");
-    createFile(p);
+    FSDataOutputStream out = fs.create(p, false, 4096, (short)3, 1L << 17);
+    out.write(mydata);
+    out.close();
 
-    final int one_third = data.length/3;
+    final int one_third = mydata.length/3;
     final int two_third = one_third*2;
 
     { //test seek
       final int offset = one_third; 
-      final int len = data.length - offset;
+      final int len = mydata.length - offset;
       final byte[] buf = new byte[len];
 
       final FSDataInputStream in = fs.open(p);
@@ -232,13 +238,13 @@ public class TestWebHdfsFileSystemContra
   
       for (int i = 0; i < buf.length; i++) {
         assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
-            data[i + offset], buf[i]);
+            mydata[i + offset], buf[i]);
       }
     }
 
     { //test position read (read the data after the two_third location)
       final int offset = two_third; 
-      final int len = data.length - offset;
+      final int len = mydata.length - offset;
       final byte[] buf = new byte[len];
 
       final FSDataInputStream in = fs.open(p);
@@ -247,7 +253,7 @@ public class TestWebHdfsFileSystemContra
   
       for (int i = 0; i < buf.length; i++) {
         assertEquals("Position " + i + ", offset=" + offset + ", length=" + len,
-            data[i + offset], buf[i]);
+            mydata[i + offset], buf[i]);
       }
     }
   }



Mime
View raw message