hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r812701 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Tue, 08 Sep 2009 21:35:25 GMT
Author: suresh
Date: Tue Sep  8 21:35:24 2009
New Revision: 812701

URL: http://svn.apache.org/viewvc?rev=812701&view=rev
Log:
HDFS-235. Add support for byte ranges in HftpFileSystem to serve range of bytes from a file.
Contributed by Bill Zeller.

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=812701&r1=812700&r2=812701&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Sep  8 21:35:24 2009
@@ -31,6 +31,9 @@
     HADOOP-6234. Updated hadoop-core and test jars to propagate new option 
     dfs.umaskmode in configuration. (Jakob Homan via suresh)
 
+    HDFS-235. Add support for byte ranges in HftpFileSystem to serve
+    range of bytes from a file. (Bill Zeller via suresh)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=812701&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Tue Sep  8
21:35:24 2009
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import org.apache.hadoop.fs.FSInputStream;
+
+
+/**
+ * To support HTTP byte streams, a new connection to an HTTP server needs to be
+ * created each time. This class hides the complexity of those multiple 
+ * connections from the client. Whenever seek() is called, a new connection
+ * is made on the successive read(). The normal input stream functions are 
+ * connected to the currently active input stream. 
+ */
+class ByteRangeInputStream extends FSInputStream {
+  
+  /**
+   * This class wraps a URL to allow easy mocking when testing. The URL class
+   * cannot be easily mocked because it is public.
+   */
+  static class URLOpener {
+    protected URL url;
+  
+    public URLOpener(URL u) {
+      url = u;
+    }
+  
+    public void setURL(URL u) {
+      url = u;
+    }
+  
+    public URL getURL() {
+      return url;
+    }
+  
+    public HttpURLConnection openConnection() throws IOException {
+      return (HttpURLConnection)url.openConnection();
+    }  
+  }
+  
+  
+  protected InputStream in;
+  protected URLOpener originalURL;
+  protected URLOpener resolvedURL;
+  protected long startPos = 0;
+  protected long currentPos = 0;
+  protected int status = STATUS_SEEK;
+  protected static final int STATUS_NORMAL = 0;
+  protected static final int STATUS_SEEK = 1;
+
+  ByteRangeInputStream(final URL url) {
+    this(new URLOpener(url), new URLOpener(null));
+  }
+  
+  ByteRangeInputStream(URLOpener o, URLOpener r) {
+    this.originalURL = o;
+    this.resolvedURL = r;
+  }
+  
+  private InputStream getInputStream() throws IOException {
+    if (status != STATUS_NORMAL) {
+      
+      if (in != null) {
+        in.close();
+        in = null;
+      }
+      
+      final URLOpener o;
+      
+      // use the original url  if no resolved url exists (e.g., if it's 
+      // the first time a request is made)
+      System.out.println("url: "+resolvedURL.getURL());
+      if (resolvedURL.getURL() == null) {
+        o = originalURL;
+      } else {
+        o = resolvedURL;
+      }
+        
+      final HttpURLConnection connection = o.openConnection();
+      connection.setRequestMethod("GET");
+      if (startPos != 0) {
+        connection.setRequestProperty("Range", "bytes="+startPos+"-");
+      }
+      connection.connect();
+      in = connection.getInputStream();
+      
+      if (startPos != 0 && connection.getResponseCode() != 206) {
+        // we asked for a byte range but did not receive a partial content
+        // response...
+        throw new IOException("206 expected, but received "
+                              + connection.getResponseCode());
+      } else if(startPos == 0 && connection.getResponseCode() != 200) {
+        // we asked for all bytes from the beginning but didn't receive a 200
+        // response (none of the other 2xx codes are valid here)
+        throw new IOException("200 expected, but received "
+                              + connection.getResponseCode());
+      }
+      
+      resolvedURL.setURL(connection.getURL());
+      status = STATUS_NORMAL;
+    }
+    
+    return in;
+  }
+  
+  public int read() throws IOException {
+    int ret = getInputStream().read();
+    if (ret != -1) {
+     currentPos++;
+    }
+    return ret;
+  }
+  
+  /**
+   * Seek to the given offset from the start of the file.
+   * The next read() will be from that location.  Can't
+   * seek past the end of the file.
+   */
+  public void seek(long pos) throws IOException {
+    if (pos != currentPos) {
+      startPos = pos;
+      currentPos = pos;
+      status = STATUS_SEEK;
+    }
+  }
+
+  /**
+   * Return the current offset from the start of the file
+   */
+  public long getPos() throws IOException {
+    return currentPos; // keep total count?
+  }
+
+  /**
+   * Seeks a different copy of the data.  Returns true if
+   * found a new source, false otherwise.
+   */
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+}
+
+

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=812701&r1=812700&r2=812701&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Sep  8 21:35:24
2009
@@ -59,6 +59,9 @@
 import org.xml.sax.XMLReader;
 import org.xml.sax.helpers.DefaultHandler;
 import org.xml.sax.helpers.XMLReaderFactory;
+import org.apache.hadoop.hdfs.ByteRangeInputStream;
+
+
 
 /** An implementation of a protocol for accessing filesystems over HTTP.
  * The following implementation provides a limited, read-only interface
@@ -115,55 +118,48 @@
     } 
   }
 
-  /**
-   * Open an HTTP connection to the namenode to read file data and metadata.
-   * @param path The path component of the URL
-   * @param query The query component of the URL
-   */
-  protected HttpURLConnection openConnection(String path, String query)
-      throws IOException {
+
+  /* 
+    Construct URL pointing to file on namenode
+  */
+  URL getNamenodeFileURL(Path f) throws IOException {
+    return getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi);
+  }
+
+  /* 
+    Construct URL pointing to namenode. 
+  */
+  URL getNamenodeURL(String path, String query) throws IOException {
     try {
       final URL url = new URI("http", null, nnAddr.getHostName(),
           nnAddr.getPort(), path, query, null).toURL();
       if (LOG.isTraceEnabled()) {
         LOG.trace("url=" + url);
       }
-      HttpURLConnection connection = (HttpURLConnection)url.openConnection();
-      connection.setRequestMethod("GET");
-      connection.connect();
-      return connection;
+      return url;
     } catch (URISyntaxException e) {
-      throw (IOException)new IOException().initCause(e);
+      throw new IOException(e);
     }
   }
 
+  /**
+   * Open an HTTP connection to the namenode to read file data and metadata.
+   * @param path The path component of the URL
+   * @param query The query component of the URL
+   */
+  protected HttpURLConnection openConnection(String path, String query)
+      throws IOException {
+    final URL url = getNamenodeURL(path, query);
+    HttpURLConnection connection = (HttpURLConnection)url.openConnection();
+    connection.setRequestMethod("GET");
+    connection.connect();
+    return connection;
+  }
+
   @Override
   public FSDataInputStream open(Path f, int buffersize) throws IOException {
-    HttpURLConnection connection = null;
-    connection = openConnection("/data" + f.toUri().getPath(), "ugi=" + ugi);
-    final InputStream in = connection.getInputStream();
-    return new FSDataInputStream(new FSInputStream() {
-        public int read() throws IOException {
-          return in.read();
-        }
-        public int read(byte[] b, int off, int len) throws IOException {
-          return in.read(b, off, len);
-        }
-
-        public void close() throws IOException {
-          in.close();
-        }
-
-        public void seek(long pos) throws IOException {
-          throw new IOException("Can't seek!");
-        }
-        public long getPos() throws IOException {
-          throw new IOException("Position unknown!");
-        }
-        public boolean seekToNewSource(long targetPos) throws IOException {
-          return false;
-        }
-      });
+    URL u = getNamenodeURL("/data" + f.toUri().getPath(), "ugi=" + ugi);
+    return new FSDataInputStream(new ByteRangeInputStream(u));
   }
 
   /** Class to parse and store a listing reply from the server. */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java?rev=812701&r1=812700&r2=812701&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/StreamFile.java Tue
Sep  8 21:35:24 2009
@@ -21,17 +21,18 @@
 import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
-
+import java.util.Enumeration;
+import java.util.List;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.mortbay.jetty.InclusiveByteRange;
 
 public class StreamFile extends DfsServlet {
   /** for java.io.Serializable */
@@ -65,22 +66,104 @@
       out.print("Invalid input");
       return;
     }
-    DFSClient dfs = getDFSClient(request);
+    
+    Enumeration reqRanges = request.getHeaders("Range");
+    if (reqRanges != null && !reqRanges.hasMoreElements())
+      reqRanges = null;
+
+    DFSClient dfs = getDFSClient(request);  
+    long fileLen = dfs.getFileInfo(filename).getLen();
     FSInputStream in = dfs.open(filename);
     OutputStream os = response.getOutputStream();
-    response.setHeader("Content-Disposition", "attachment; filename=\"" + 
-                       filename + "\"");
-    response.setContentType("application/octet-stream");
-    byte buf[] = new byte[4096];
+
     try {
-      int bytesRead;
-      while ((bytesRead = in.read(buf)) != -1) {
-        os.write(buf, 0, bytesRead);
+      if (reqRanges != null) {
+        List ranges = InclusiveByteRange.satisfiableRanges(reqRanges,
+                                                           fileLen);
+        StreamFile.sendPartialData(in, os, response, fileLen, ranges);
+      } else {
+        // No ranges, so send entire file
+        response.setHeader("Content-Disposition", "attachment; filename=\"" + 
+                           filename + "\"");
+        response.setContentType("application/octet-stream");
+        StreamFile.writeTo(in, os, 0L, fileLen);
       }
     } finally {
       in.close();
       os.close();
       dfs.close();
+    }      
+  }
+  
+  static void sendPartialData(FSInputStream in,
+                              OutputStream os,
+                              HttpServletResponse response,
+                              long contentLength,
+                              List ranges)
+  throws IOException {
+
+    if (ranges == null || ranges.size() != 1) {
+      //  if there are no satisfiable ranges, or if multiple ranges are
+      // requested (we don't support multiple range requests), send 416 response
+      response.setContentLength(0);
+      int status = HttpServletResponse.SC_REQUESTED_RANGE_NOT_SATISFIABLE;
+      response.setStatus(status);
+      response.setHeader("Content-Range", 
+                InclusiveByteRange.to416HeaderRangeString(contentLength));
+    } else {
+      //  if there is only a single valid range (must be satisfiable 
+      //  since were here now), send that range with a 206 response
+      InclusiveByteRange singleSatisfiableRange =
+        (InclusiveByteRange)ranges.get(0);
+      long singleLength = singleSatisfiableRange.getSize(contentLength);
+      response.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);
+      response.setHeader("Content-Range", 
+        singleSatisfiableRange.toHeaderRangeString(contentLength));
+      System.out.println("first: "+singleSatisfiableRange.getFirst(contentLength));
+      System.out.println("singleLength: "+singleLength);
+      
+      StreamFile.writeTo(in,
+                         os,
+                         singleSatisfiableRange.getFirst(contentLength),
+                         singleLength);
     }
   }
+  
+  static void writeTo(FSInputStream in,
+                      OutputStream os,
+                      long start,
+                      long count) 
+  throws IOException {
+    byte buf[] = new byte[4096];
+    long bytesRemaining = count;
+    int bytesRead;
+    int bytesToRead;
+
+    in.seek(start);
+
+    while (true) {
+      // number of bytes to read this iteration
+      bytesToRead = (int)(bytesRemaining<buf.length ? 
+                                                      bytesRemaining:
+                                                      buf.length);
+      
+      // number of bytes actually read this iteration
+      bytesRead = in.read(buf, 0, bytesToRead);
+
+      // if we can't read anymore, break
+      if (bytesRead == -1) {
+        break;
+      } 
+      
+      os.write(buf, 0, bytesRead);
+
+      bytesRemaining -= bytesRead;
+
+      // if we don't need to read anymore, break
+      if (bytesRemaining <= 0) {
+        break;
+      }
+
+    } 
+  }
 }

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=812701&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestByteRangeInputStream.java Tue
Sep  8 21:35:24 2009
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.ByteRangeInputStream;
+import org.apache.hadoop.hdfs.ByteRangeInputStream.URLOpener;
+
+class MockHttpURLConnection extends HttpURLConnection {
+  MockURL m;
+  
+  public MockHttpURLConnection(URL u, MockURL m) {
+    super(u); 
+    this.m = m;
+  }
+  
+  public boolean usingProxy(){
+    return false;
+  }
+  
+  public void disconnect() {
+  }
+  
+  public void connect() throws IOException {
+    m.setMsg("Connect: "+url+", Range: "+getRequestProperty("Range"));
+  }
+  
+  public InputStream getInputStream() throws IOException {
+    return new ByteArrayInputStream("asdf".getBytes());
+  } 
+
+  public URL getURL() {
+    URL u = null;
+    try {
+      u = new URL("http://resolvedurl/");
+    } catch (Exception e) {
+      System.out.println(e.getMessage());
+    }
+    return u;
+  }
+  
+  public int getResponseCode() {
+    if (m.responseCode != -1) {
+      return m.responseCode;
+    } else {
+      if (getRequestProperty("Range") == null) {
+        return 200;
+      } else {
+        return 206;
+      }
+    }
+  }
+  
+}
+
+class MockURL extends URLOpener {
+  String msg;
+  public int responseCode = -1;
+  
+  public MockURL(URL u) {
+    super(u);
+  }
+
+  public MockURL(String s) throws MalformedURLException {
+    this(new URL(s));
+  }
+
+  public HttpURLConnection openConnection() throws IOException {
+    return new MockHttpURLConnection(url, this);
+  }    
+
+  public void setMsg(String s) {
+    msg = s;
+  }
+  
+  public String getMsg() {
+    return msg;
+  }
+}
+
+
+
+public class TestByteRangeInputStream extends TestCase {
+  
+  private static final Log LOG = 
+                           LogFactory.getLog(TestByteRangeInputStream.class);
+  
+  public void testByteRange() throws IOException, InterruptedException {
+    MockURL o = new MockURL("http://test/");
+    MockURL r =  new MockURL((URL)null);
+    ByteRangeInputStream is = new ByteRangeInputStream(o, r);
+
+    assertEquals("getPos wrong", 0, is.getPos());
+
+    is.read();
+
+    assertEquals("Initial call made incorrectly", 
+                 "Connect: http://test/, Range: null",
+                 o.getMsg());
+
+    assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
+
+    o.setMsg(null);
+
+    is.read();
+
+    assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
+
+    assertNull("No additional connections should have been made (no seek)",
+               o.getMsg());
+
+    r.setMsg(null);
+    r.setURL(new URL("http://resolvedurl/"));
+    
+    is.seek(100);
+    is.read();
+
+    assertEquals("Seek to 100 bytes made incorrectly", 
+                 "Connect: http://resolvedurl/, Range: bytes=100-",
+                 r.getMsg());
+
+    assertEquals("getPos should be 101 after reading one byte", 101, is.getPos());
+
+    r.setMsg(null);
+
+    is.seek(101);
+    is.read();
+
+    assertNull("Seek to 101 should not result in another request", null);
+
+    r.setMsg(null);
+    is.seek(2500);
+    is.read();
+
+    assertEquals("Seek to 2500 bytes made incorrectly", 
+                 "Connect: http://resolvedurl/, Range: bytes=2500-",
+                 r.getMsg());
+
+    r.responseCode = 200;
+    is.seek(500);
+    
+    try {
+      is.read();
+      fail("Exception should be thrown when 200 response is given "
+           + "but 206 is expected");
+    } catch (IOException e) {
+      assertEquals("Should fail because incorrect response code was sent",
+                   "206 expected, but received 200", e.getMessage());
+    }
+
+    r.responseCode = 206;
+    is.seek(0);
+
+    try {
+      is.read();
+      fail("Exception should be thrown when 206 response is given "
+           + "but 200 is expected");
+    } catch (IOException e) {
+      assertEquals("Should fail because incorrect response code was sent",
+                   "200 expected, but received 206", e.getMessage());
+    }
+
+
+
+  }
+}

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java?rev=812701&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
(added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStreamFile.java
Tue Sep  8 21:35:24 2009
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Vector;
+import javax.servlet.http.HttpServletResponse;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import org.mortbay.jetty.InclusiveByteRange;
+import static org.junit.Assert.*;
+
+/*
+  Mock input stream class that always outputs the current position of the stream
+*/
+class MockFSInputStream extends FSInputStream {
+  long currentPos = 0;
+  public int read() throws IOException {
+    return (int)(currentPos++);
+  }
+
+  public void close() throws IOException {
+  }
+
+  public void seek(long pos) throws IOException {
+    currentPos = pos;
+  }
+  
+  public long getPos() throws IOException {
+    return currentPos;
+  }
+  
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+}
+
+
+class MockHttpServletResponse implements HttpServletResponse {
+
+  private int status = -1;
+  
+  public MockHttpServletResponse() {
+  }
+  
+  public int getStatus() {
+    return status;
+  }
+  
+  
+  public void setStatus(int sc) {
+    status = sc;
+  }
+  
+  @SuppressWarnings("deprecation")
+  public void setStatus(int sc, java.lang.String sm) {
+  }
+  
+  public void addIntHeader(String name, int value) {
+  }
+
+  public void setIntHeader(String name, int value) { 
+  }
+  
+  public void addHeader(String name, String value) {
+  }
+
+  public void setHeader(String name, String value) {
+  }
+  
+  public void addDateHeader(java.lang.String name, long date) {
+  }
+  
+  public void setDateHeader(java.lang.String name, long date) {
+  }
+
+  public void sendRedirect(java.lang.String location) { 
+  }
+  
+  public void sendError(int e) {
+  }
+  
+  public void sendError(int a, java.lang.String b) {
+  }
+  
+  public String encodeRedirectUrl(java.lang.String a) {
+    return null;
+  }
+  
+  public String encodeUrl(java.lang.String url) {
+    return null;
+  }
+  
+  public String encodeRedirectURL(java.lang.String url) {
+    return null;
+  }
+  
+  public String encodeURL(java.lang.String url) {
+    return null;
+  }
+  
+  public boolean containsHeader(java.lang.String name) {
+    return false;
+  }
+  
+  public void addCookie(javax.servlet.http.Cookie cookie) {
+  }
+  
+  public java.util.Locale getLocale() {
+    return null;
+  }
+  
+  public void setLocale(java.util.Locale loc) {
+  }
+  
+  public void reset() {
+  }
+  
+  public boolean isCommitted() {
+    return false;
+  }
+  
+  public void resetBuffer() {
+  }
+  
+  public void flushBuffer() {
+  }
+  
+  public int getBufferSize() {
+    return 0;
+  }
+  
+  public void setBufferSize(int size) {
+  }
+  
+  public void setContentType(java.lang.String type) {
+  }
+  
+  public void setContentLength(int len) {
+  }
+  
+  public void setCharacterEncoding(java.lang.String charset) {
+  }
+  
+  public java.io.PrintWriter getWriter() {
+    return null;
+  }
+  
+  public javax.servlet.ServletOutputStream getOutputStream() {
+    return null;
+  }
+  
+  public java.lang.String getContentType() {
+    return null;
+  }
+  
+  public java.lang.String getCharacterEncoding() {
+    return null;
+  }
+}
+
+
+
+public class TestStreamFile extends TestCase {
+  
+  private static final Log LOG =  LogFactory.getLog(TestStreamFile.class);
+
+  // return an array matching the output of mockfsinputstream
+  private static byte[] getOutputArray(int start, int count) {
+    byte[] a = new byte[count];
+    
+    for (int i = 0; i < count; i++) {
+      a[i] = (byte)(start+i);
+    }
+
+    return a;
+  }
+  
+  public void testWriteTo() throws IOException, InterruptedException {
+
+    FSInputStream fsin = new MockFSInputStream();
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+    // new int[]{s_1, c_1, s_2, c_2, ..., s_n, c_n} means to test
+    // reading c_i bytes starting at s_i
+    int[] pairs = new int[]{ 0, 10000,
+                             50, 100,
+                             50, 6000,
+                             1000, 2000,
+                             0, 1,
+                             0, 0,
+                             5000, 0,
+                            };
+                            
+    assertTrue("Pairs array must be even", pairs.length % 2 == 0);
+    
+    for (int i = 0; i < pairs.length; i+=2) {
+      StreamFile.writeTo(fsin, os, pairs[i], pairs[i+1]);
+      assertArrayEquals("Reading " + pairs[i+1]
+                        + " bytes from offset " + pairs[i],
+                        getOutputArray(pairs[i], pairs[i+1]),
+                        os.toByteArray());
+      os.reset();
+    }
+    
+  }
+  
+  private List strToRanges(String s, int contentLength) {
+    List<String> l = Arrays.asList(new String[]{"bytes="+s});
+    Enumeration e = (new Vector<String>(l)).elements();
+    return InclusiveByteRange.satisfiableRanges(e, contentLength);
+  }
+  
+  public void testSendPartialData() throws IOException, InterruptedException {
+    FSInputStream in = new MockFSInputStream();
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+    // test if multiple ranges, then 416
+    { 
+      List ranges = strToRanges("0-,10-300", 500);
+      MockHttpServletResponse response = new MockHttpServletResponse();
+      StreamFile.sendPartialData(in, os, response, 500, ranges);
+      assertEquals("Multiple ranges should result in a 416 error",
+                   416, response.getStatus());
+    }
+                              
+    // test if no ranges, then 416
+    { 
+      os.reset();
+      MockHttpServletResponse response = new MockHttpServletResponse();
+      StreamFile.sendPartialData(in, os, response, 500, null);
+      assertEquals("No ranges should result in a 416 error",
+                   416, response.getStatus());
+    }
+
+    // test if invalid single range (out of bounds), then 416
+    { 
+      List ranges = strToRanges("600-800", 500);
+      MockHttpServletResponse response = new MockHttpServletResponse();
+      StreamFile.sendPartialData(in, os, response, 500, ranges);
+      assertEquals("Single (but invalid) range should result in a 416",
+                   416, response.getStatus());
+    }
+
+      
+    // test if one (valid) range, then 206
+    { 
+      List ranges = strToRanges("100-300", 500);
+      MockHttpServletResponse response = new MockHttpServletResponse();
+      StreamFile.sendPartialData(in, os, response, 500, ranges);
+      assertEquals("Single (valid) range should result in a 206",
+                   206, response.getStatus());
+      assertArrayEquals("Byte range from 100-300",
+                        getOutputArray(100, 201),
+                        os.toByteArray());
+    }
+    
+  }
+}



Mime
View raw message