hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [4/4] hadoop git commit: HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. Contributed by Haohui Mai.
Date Fri, 24 Apr 2015 00:44:34 GMT
HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. Contributed by Haohui Mai.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b64bb2b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b64bb2b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b64bb2b9

Branch: refs/heads/branch-2
Commit: b64bb2b9b4f01f9c267393f625bec41b86041ccb
Parents: 8f6053a
Author: Haohui Mai <wheat9@apache.org>
Authored: Thu Apr 23 17:33:05 2015 -0700
Committer: Haohui Mai <wheat9@apache.org>
Committed: Thu Apr 23 17:44:21 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/io/retry/RetryUtils.java  |    4 +-
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   |  109 ++
 .../hdfs/client/HdfsClientConfigKeys.java       |    5 +
 .../hdfs/protocol/HdfsConstantsClient.java      |    4 +
 .../hadoop/hdfs/web/ByteRangeInputStream.java   |  258 +++
 .../apache/hadoop/hdfs/web/JsonUtilClient.java  |  485 ++++++
 .../hdfs/web/KerberosUgiAuthenticator.java      |   45 +
 .../hadoop/hdfs/web/SWebHdfsFileSystem.java     |   48 +
 .../org/apache/hadoop/hdfs/web/TokenAspect.java |  185 +++
 .../hadoop/hdfs/web/URLConnectionFactory.java   |  187 +++
 .../hadoop/hdfs/web/WebHdfsConstants.java       |    4 +
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      | 1470 +++++++++++++++++
 .../hdfs/web/resources/BufferSizeParam.java     |   60 +
 .../hadoop/fs/http/client/HttpFSFileSystem.java |    3 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |    2 +
 .../hadoop/hdfs/BlockStorageLocationUtil.java   |    3 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |    2 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |    9 +-
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |   89 +-
 .../org/apache/hadoop/hdfs/NameNodeProxies.java |    2 +-
 .../hdfs/protocol/HdfsLocatedFileStatus.java    |    4 +-
 .../server/namenode/NameNodeHttpServer.java     |    1 +
 .../hdfs/tools/DelegationTokenFetcher.java      |    4 +-
 .../hadoop/hdfs/web/ByteRangeInputStream.java   |  258 ---
 .../apache/hadoop/hdfs/web/HftpFileSystem.java  |   10 +-
 .../apache/hadoop/hdfs/web/HsftpFileSystem.java |   10 +-
 .../apache/hadoop/hdfs/web/JsonUtilClient.java  |  484 ------
 .../hdfs/web/KerberosUgiAuthenticator.java      |   45 -
 .../hadoop/hdfs/web/SWebHdfsFileSystem.java     |   48 -
 .../org/apache/hadoop/hdfs/web/TokenAspect.java |  185 ---
 .../hadoop/hdfs/web/URLConnectionFactory.java   |  187 ---
 .../hadoop/hdfs/web/WebHdfsFileSystem.java      | 1471 ------------------
 .../hdfs/web/resources/BufferSizeParam.java     |   60 -
 .../org/apache/hadoop/fs/TestSymlinkHdfs.java   |    3 +-
 .../org/apache/hadoop/hdfs/TestDFSUtil.java     |    4 +-
 .../hadoop/hdfs/TestDistributedFileSystem.java  |    3 +-
 .../java/org/apache/hadoop/hdfs/TestQuota.java  |    5 +-
 .../hdfs/security/TestDelegationToken.java      |    3 +-
 .../TestDelegationTokenForProxyUser.java        |    3 +-
 .../hdfs/server/namenode/TestAuditLogs.java     |    3 +-
 .../TestNameNodeRespectsBindHostKeys.java       |    3 +-
 .../hdfs/web/TestFSMainOperationsWebHdfs.java   |    3 +-
 .../hdfs/web/TestHftpDelegationToken.java       |    6 +-
 .../hadoop/hdfs/web/TestHftpFileSystem.java     |   13 +-
 .../apache/hadoop/hdfs/web/TestHttpFSPorts.java |    8 +-
 .../hadoop/hdfs/web/TestHttpsFileSystem.java    |    3 +-
 .../org/apache/hadoop/hdfs/web/TestWebHDFS.java |    6 +-
 .../hdfs/web/TestWebHdfsFileSystemContract.java |    4 +-
 .../hadoop/hdfs/web/TestWebHdfsTokens.java      |    3 +-
 .../web/TestWebHdfsWithMultipleNameNodes.java   |    4 +-
 .../apache/hadoop/hdfs/web/WebHdfsTestUtil.java |    3 +-
 .../tools/TestDelegationTokenRemoteFetcher.java |    4 +-
 52 files changed, 2935 insertions(+), 2890 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
index e6f4519..b2e115f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
@@ -60,7 +60,7 @@ public class RetryUtils {
       boolean defaultRetryPolicyEnabled,
       String retryPolicySpecKey,
       String defaultRetryPolicySpec,
-      final Class<? extends Exception> remoteExceptionToRetry
+      final String remoteExceptionToRetry
       ) {
     
     final RetryPolicy multipleLinearRandomRetry = 
@@ -94,7 +94,7 @@ public class RetryUtils {
           final RetryPolicy p;
           if (e instanceof RemoteException) {
             final RemoteException re = (RemoteException)e;
-            p = remoteExceptionToRetry.getName().equals(re.getClassName())?
+            p = remoteExceptionToRetry.equals(re.getClassName())?
                 multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
           } else if (e instanceof IOException || e instanceof ServiceException) {
             p = multipleLinearRandomRetry;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 84fb12c..97d3408 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -19,10 +19,17 @@ package org.apache.hadoop.hdfs;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
+import org.apache.commons.io.Charsets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +38,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
@@ -40,6 +48,13 @@ public class DFSUtilClient {
   private static final Logger LOG = LoggerFactory.getLogger(
       DFSUtilClient.class);
   /**
+   * Converts a string to a byte array using UTF8 encoding.
+   */
+  public static byte[] string2Bytes(String str) {
+    return str.getBytes(Charsets.UTF_8);
+  }
+
+  /**
    * Converts a byte array to a string using UTF8 encoding.
    */
   public static String bytes2String(byte[] bytes) {
@@ -114,6 +129,62 @@ public class DFSUtilClient {
   }
 
   /**
+   * Convert a LocatedBlocks to BlockLocations[]
+   * @param blocks a LocatedBlocks
+   * @return an array of BlockLocations
+   */
+  public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
+    if (blocks == null) {
+      return new BlockLocation[0];
+    }
+    return locatedBlocks2Locations(blocks.getLocatedBlocks());
+  }
+
+  /**
+   * Convert a List<LocatedBlock> to BlockLocation[]
+   * @param blocks A List<LocatedBlock> to be converted
+   * @return converted array of BlockLocation
+   */
+  public static BlockLocation[] locatedBlocks2Locations(
+      List<LocatedBlock> blocks) {
+    if (blocks == null) {
+      return new BlockLocation[0];
+    }
+    int nrBlocks = blocks.size();
+    BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
+    if (nrBlocks == 0) {
+      return blkLocations;
+    }
+    int idx = 0;
+    for (LocatedBlock blk : blocks) {
+      assert idx < nrBlocks : "Incorrect index";
+      DatanodeInfo[] locations = blk.getLocations();
+      String[] hosts = new String[locations.length];
+      String[] xferAddrs = new String[locations.length];
+      String[] racks = new String[locations.length];
+      for (int hCnt = 0; hCnt < locations.length; hCnt++) {
+        hosts[hCnt] = locations[hCnt].getHostName();
+        xferAddrs[hCnt] = locations[hCnt].getXferAddr();
+        NodeBase node = new NodeBase(xferAddrs[hCnt],
+                                     locations[hCnt].getNetworkLocation());
+        racks[hCnt] = node.toString();
+      }
+      DatanodeInfo[] cachedLocations = blk.getCachedLocations();
+      String[] cachedHosts = new String[cachedLocations.length];
+      for (int i=0; i<cachedLocations.length; i++) {
+        cachedHosts[i] = cachedLocations[i].getHostName();
+      }
+      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
+                                            racks,
+                                            blk.getStartOffset(),
+                                            blk.getBlockSize(),
+                                            blk.isCorrupt());
+      idx++;
+    }
+    return blkLocations;
+  }
+
+  /**
    * Decode a specific range of bytes of the given byte array to a string
    * using UTF8.
    *
@@ -234,4 +305,42 @@ public class DFSUtilClient {
     }
     return value;
   }
+
+  /**
+   * Whether the pathname is valid.  Currently prohibits relative paths,
+   * names which contain a ":" or "//", or other non-canonical paths.
+   */
+  public static boolean isValidName(String src) {
+    // Path must be absolute.
+    if (!src.startsWith(Path.SEPARATOR)) {
+      return false;
+    }
+
+    // Check for ".." "." ":" "/"
+    String[] components = StringUtils.split(src, '/');
+    for (int i = 0; i < components.length; i++) {
+      String element = components[i];
+      if (element.equals(".")  ||
+          (element.contains(":"))  ||
+          (element.contains("/"))) {
+        return false;
+      }
+      // ".." is allowed in path starting with /.reserved/.inodes
+      if (element.equals("..")) {
+        if (components.length > 4
+            && components[1].equals(".reserved")
+            && components[2].equals(".inodes")) {
+          continue;
+        }
+        return false;
+      }
+      // The string may start or end with a /, but not have
+      // "//" in the middle.
+      if (element.isEmpty() && i != components.length - 1 &&
+          i != 0) {
+        return false;
+      }
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index d11922d..3a9a967 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -26,6 +26,7 @@ public interface HdfsClientConfigKeys {
   long    DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
   String  DFS_REPLICATION_KEY = "dfs.replication";
   short   DFS_REPLICATION_DEFAULT = 3;
+  String  DFS_WEBHDFS_USER_PATTERN_KEY = "dfs.webhdfs.user.provider.user.pattern";
   String  DFS_WEBHDFS_USER_PATTERN_DEFAULT = "^[A-Za-z_][A-Za-z0-9._-]*[$]?$";
   String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
       "^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
@@ -37,6 +38,10 @@ public interface HdfsClientConfigKeys {
   int     DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
   String  DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
   String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
+  String  DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
+  boolean DFS_WEBHDFS_ENABLED_DEFAULT = true;
+  String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
+  String  DFS_NAMENODE_HTTPS_PORT_KEY = "dfs.https.port";
 
   /** dfs.client.retry configuration properties */
   interface Retry {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
index ab4310e..00f07e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
@@ -38,4 +38,8 @@ public interface HdfsConstantsClient {
    * URI.
    */
   String HA_DT_SERVICE_PREFIX = "ha-";
+  // The name of the SafeModeException. FileSystem should retry if it sees
+  // the below exception in RPC
+  String SAFEMODE_EXCEPTION_CLASS_NAME = "org.apache.hadoop.hdfs.server" +
+      ".namenode.SafeModeException";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
new file mode 100644
index 0000000..9e3b29a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.http.HttpStatus;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HttpHeaders;
+
+/**
+ * To support HTTP byte streams, a new connection to an HTTP server needs to be
+ * created each time. This class hides the complexity of those multiple
+ * connections from the client. Whenever seek() is called, a new connection
+ * is made on the successive read(). The normal input stream functions are
+ * connected to the currently active input stream.
+ */
+public abstract class ByteRangeInputStream extends FSInputStream {
+
+  /**
+   * This class wraps a URL and provides method to open connection.
+   * It can be overridden to change how a connection is opened.
+   */
+  public static abstract class URLOpener {
+    protected URL url;
+
+    public URLOpener(URL u) {
+      url = u;
+    }
+
+    public void setURL(URL u) {
+      url = u;
+    }
+
+    public URL getURL() {
+      return url;
+    }
+
+    /** Connect to server with a data offset. */
+    protected abstract HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException;
+  }
+
+  enum StreamStatus {
+    NORMAL, SEEK, CLOSED
+  }
+  protected InputStream in;
+  protected final URLOpener originalURL;
+  protected final URLOpener resolvedURL;
+  protected long startPos = 0;
+  protected long currentPos = 0;
+  protected Long fileLength = null;
+
+  StreamStatus status = StreamStatus.SEEK;
+
+  /**
+   * Create with the specified URLOpeners. Original url is used to open the
+   * stream for the first time. Resolved url is used in subsequent requests.
+   * @param o Original url
+   * @param r Resolved url
+   */
+  public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
+    this.originalURL = o;
+    this.resolvedURL = r;
+    getInputStream();
+  }
+
+  protected abstract URL getResolvedUrl(final HttpURLConnection connection
+      ) throws IOException;
+
+  @VisibleForTesting
+  protected InputStream getInputStream() throws IOException {
+    switch (status) {
+      case NORMAL:
+        break;
+      case SEEK:
+        if (in != null) {
+          in.close();
+        }
+        in = openInputStream();
+        status = StreamStatus.NORMAL;
+        break;
+      case CLOSED:
+        throw new IOException("Stream closed");
+    }
+    return in;
+  }
+
+  @VisibleForTesting
+  protected InputStream openInputStream() throws IOException {
+    // Use the original url if no resolved url exists, eg. if
+    // it's the first time a request is made.
+    final boolean resolved = resolvedURL.getURL() != null;
+    final URLOpener opener = resolved? resolvedURL: originalURL;
+
+    final HttpURLConnection connection = opener.connect(startPos, resolved);
+    resolvedURL.setURL(getResolvedUrl(connection));
+
+    InputStream in = connection.getInputStream();
+    final Map<String, List<String>> headers = connection.getHeaderFields();
+    if (isChunkedTransferEncoding(headers)) {
+      // file length is not known
+      fileLength = null;
+    } else {
+      // for non-chunked transfer-encoding, get content-length
+      long streamlength = getStreamLength(connection, headers);
+      fileLength = startPos + streamlength;
+
+      // Java has a bug with >2GB request streams.  It won't bounds check
+      // the reads so the transfer blocks until the server times out
+      in = new BoundedInputStream(in, streamlength);
+    }
+
+    return in;
+  }
+
+  private static long getStreamLength(HttpURLConnection connection,
+      Map<String, List<String>> headers) throws IOException {
+    String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+    if (cl == null) {
+      // Try to get the content length by parsing the content range
+      // because HftpFileSystem does not return the content length
+      // if the content is partial.
+      if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) {
+        cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE);
+        return getLengthFromRange(cl);
+      } else {
+        throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+            + headers);
+      }
+    }
+    return Long.parseLong(cl);
+  }
+
+  private static long getLengthFromRange(String cl) throws IOException {
+    try {
+
+      String[] str = cl.substring(6).split("[-/]");
+      return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1;
+    } catch (Exception e) {
+      throw new IOException(
+          "failed to get content length by parsing the content range: " + cl
+              + " " + e.getMessage());
+    }
+  }
+
+  private static boolean isChunkedTransferEncoding(
+      final Map<String, List<String>> headers) {
+    return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
+        || contains(headers, HttpHeaders.TE, "chunked");
+  }
+
+  /** Does the HTTP header map contain the given key, value pair? */
+  private static boolean contains(final Map<String, List<String>> headers,
+      final String key, final String value) {
+    final List<String> values = headers.get(key);
+    if (values != null) {
+      for(String v : values) {
+        for(final StringTokenizer t = new StringTokenizer(v, ",");
+            t.hasMoreTokens(); ) {
+          if (value.equalsIgnoreCase(t.nextToken())) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  private int update(final int n) throws IOException {
+    if (n != -1) {
+      currentPos += n;
+    } else if (fileLength != null && currentPos < fileLength) {
+      throw new IOException("Got EOF but currentPos = " + currentPos
+          + " < filelength = " + fileLength);
+    }
+    return n;
+  }
+
+  @Override
+  public int read() throws IOException {
+    final int b = getInputStream().read();
+    update((b == -1) ? -1 : 1);
+    return b;
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    return update(getInputStream().read(b, off, len));
+  }
+
+  /**
+   * Seek to the given offset from the start of the file.
+   * The next read() will be from that location.  Can't
+   * seek past the end of the file.
+   */
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos != currentPos) {
+      startPos = pos;
+      currentPos = pos;
+      if (status != StreamStatus.CLOSED) {
+        status = StreamStatus.SEEK;
+      }
+    }
+  }
+
+  /**
+   * Return the current offset from the start of the file
+   */
+  @Override
+  public long getPos() throws IOException {
+    return currentPos;
+  }
+
+  /**
+   * Seeks a different copy of the data.  Returns true if
+   * found a new source, false otherwise.
+   */
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+    status = StreamStatus.CLOSED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
new file mode 100644
index 0000000..e263a0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
+import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+class JsonUtilClient {
+  static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
+
+  /** Convert a Json map to a RemoteException. */
+  static RemoteException toRemoteException(final Map<?, ?> json) {
+    final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
+    final String message = (String)m.get("message");
+    final String javaClassName = (String)m.get("javaClassName");
+    return new RemoteException(javaClassName, message);
+  }
+
+  /** Convert a Json map to a Token. */
+  static Token<? extends TokenIdentifier> toToken(
+      final Map<?, ?> m) throws IOException {
+    if (m == null) {
+      return null;
+    }
+
+    final Token<DelegationTokenIdentifier> token
+        = new Token<>();
+    token.decodeFromUrlString((String)m.get("urlString"));
+    return token;
+  }
+
+  /** Convert a Json map to a Token of BlockTokenIdentifier. */
+  @SuppressWarnings("unchecked")
+  static Token<BlockTokenIdentifier> toBlockToken(
+      final Map<?, ?> m) throws IOException {
+    return (Token<BlockTokenIdentifier>)toToken(m);
+  }
+
+  /** Convert a string to a FsPermission object. */
+  static FsPermission toFsPermission(
+      final String s, Boolean aclBit, Boolean encBit) {
+    FsPermission perm = new FsPermission(Short.parseShort(s, 8));
+    final boolean aBit = (aclBit != null) ? aclBit : false;
+    final boolean eBit = (encBit != null) ? encBit : false;
+    if (aBit || eBit) {
+      return new FsPermissionExtension(perm, aBit, eBit);
+    } else {
+      return perm;
+    }
+  }
+
+  /** Convert a Json map to a HdfsFileStatus object. */
+  static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
+    if (json == null) {
+      return null;
+    }
+
+    final Map<?, ?> m = includesType ?
+        (Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
+    final String localName = (String) m.get("pathSuffix");
+    final WebHdfsConstants.PathType type = WebHdfsConstants.PathType.valueOf((String) m.get("type"));
+    final byte[] symlink = type != WebHdfsConstants.PathType.SYMLINK? null
+        : DFSUtilClient.string2Bytes((String) m.get("symlink"));
+
+    final long len = ((Number) m.get("length")).longValue();
+    final String owner = (String) m.get("owner");
+    final String group = (String) m.get("group");
+    final FsPermission permission = toFsPermission((String) m.get("permission"),
+                                                   (Boolean) m.get("aclBit"),
+                                                   (Boolean) m.get("encBit"));
+    final long aTime = ((Number) m.get("accessTime")).longValue();
+    final long mTime = ((Number) m.get("modificationTime")).longValue();
+    final long blockSize = ((Number) m.get("blockSize")).longValue();
+    final short replication = ((Number) m.get("replication")).shortValue();
+    final long fileId = m.containsKey("fileId") ?
+        ((Number) m.get("fileId")).longValue() : HdfsConstantsClient.GRANDFATHER_INODE_ID;
+    final int childrenNum = getInt(m, "childrenNum", -1);
+    final byte storagePolicy = m.containsKey("storagePolicy") ?
+        (byte) ((Number) m.get("storagePolicy")).longValue() :
+        HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+    return new HdfsFileStatus(len, type == WebHdfsConstants.PathType.DIRECTORY, replication,
+        blockSize, mTime, aTime, permission, owner, group,
+        symlink, DFSUtilClient.string2Bytes(localName),
+        fileId, childrenNum, null,
+        storagePolicy);
+  }
+
+  /** Convert a Json map to an ExtendedBlock object. */
+  static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
+    if (m == null) {
+      return null;
+    }
+
+    final String blockPoolId = (String)m.get("blockPoolId");
+    final long blockId = ((Number) m.get("blockId")).longValue();
+    final long numBytes = ((Number) m.get("numBytes")).longValue();
+    final long generationStamp =
+        ((Number) m.get("generationStamp")).longValue();
+    return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
+  }
+
+  static int getInt(Map<?, ?> m, String key, final int defaultValue) {
+    Object value = m.get(key);
+    if (value == null) {
+      return defaultValue;
+    }
+    return ((Number) value).intValue();
+  }
+
+  static long getLong(Map<?, ?> m, String key, final long defaultValue) {
+    Object value = m.get(key);
+    if (value == null) {
+      return defaultValue;
+    }
+    return ((Number) value).longValue();
+  }
+
+  static String getString(
+      Map<?, ?> m, String key, final String defaultValue) {
+    Object value = m.get(key);
+    if (value == null) {
+      return defaultValue;
+    }
+    return (String) value;
+  }
+
+  static List<?> getList(Map<?, ?> m, String key) {
+    Object list = m.get(key);
+    if (list instanceof List<?>) {
+      return (List<?>) list;
+    } else {
+      return null;
+    }
+  }
+
+  /** Convert a Json map to an DatanodeInfo object. */
+  static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
+    throws IOException {
+    if (m == null) {
+      return null;
+    }
+
+    // ipAddr and xferPort are the critical fields for accessing data.
+    // If any one of the two is missing, an exception needs to be thrown.
+
+    // Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
+    //  of ipAddr and xferPort.
+    String ipAddr = getString(m, "ipAddr", null);
+    int xferPort = getInt(m, "xferPort", -1);
+    if (ipAddr == null) {
+      String name = getString(m, "name", null);
+      if (name != null) {
+        int colonIdx = name.indexOf(':');
+        if (colonIdx > 0) {
+          ipAddr = name.substring(0, colonIdx);
+          xferPort = Integer.parseInt(name.substring(colonIdx +1));
+        } else {
+          throw new IOException(
+              "Invalid value in server response: name=[" + name + "]");
+        }
+      } else {
+        throw new IOException(
+            "Missing both 'ipAddr' and 'name' in server response.");
+      }
+      // ipAddr is non-null & non-empty string at this point.
+    }
+
+    // Check the validity of xferPort.
+    if (xferPort == -1) {
+      throw new IOException(
+          "Invalid or missing 'xferPort' in server response.");
+    }
+
+    // TODO: Fix storageID
+    return new DatanodeInfo(
+        ipAddr,
+        (String)m.get("hostName"),
+        (String)m.get("storageID"),
+        xferPort,
+        ((Number) m.get("infoPort")).intValue(),
+        getInt(m, "infoSecurePort", 0),
+        ((Number) m.get("ipcPort")).intValue(),
+
+        getLong(m, "capacity", 0l),
+        getLong(m, "dfsUsed", 0l),
+        getLong(m, "remaining", 0l),
+        getLong(m, "blockPoolUsed", 0l),
+        getLong(m, "cacheCapacity", 0l),
+        getLong(m, "cacheUsed", 0l),
+        getLong(m, "lastUpdate", 0l),
+        getLong(m, "lastUpdateMonotonic", 0l),
+        getInt(m, "xceiverCount", 0),
+        getString(m, "networkLocation", ""),
+        DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
+  }
+
+  /** Convert an Object[] to a DatanodeInfo[]. */
+  static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
+      throws IOException {
+    if (objects == null) {
+      return null;
+    } else if (objects.isEmpty()) {
+      return EMPTY_DATANODE_INFO_ARRAY;
+    } else {
+      final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
+      int i = 0;
+      for (Object object : objects) {
+        array[i++] = toDatanodeInfo((Map<?, ?>) object);
+      }
+      return array;
+    }
+  }
+
+  /** Convert a Json map to LocatedBlock. */
+  static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
+    if (m == null) {
+      return null;
+    }
+
+    final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
+    final DatanodeInfo[] locations = toDatanodeInfoArray(
+        getList(m, "locations"));
+    final long startOffset = ((Number) m.get("startOffset")).longValue();
+    final boolean isCorrupt = (Boolean)m.get("isCorrupt");
+    final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
+        getList(m, "cachedLocations"));
+
+    final LocatedBlock locatedblock = new LocatedBlock(b, locations,
+        null, null, startOffset, isCorrupt, cachedLocations);
+    locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
+    return locatedblock;
+  }
+
+  /** Convert an List of Object to a List of LocatedBlock. */
+  static List<LocatedBlock> toLocatedBlockList(
+      final List<?> objects) throws IOException {
+    if (objects == null) {
+      return null;
+    } else if (objects.isEmpty()) {
+      return Collections.emptyList();
+    } else {
+      final List<LocatedBlock> list = new ArrayList<>(objects.size());
+      for (Object object : objects) {
+        list.add(toLocatedBlock((Map<?, ?>) object));
+      }
+      return list;
+    }
+  }
+
+  /** Convert a Json map to a ContentSummary. */
+  static ContentSummary toContentSummary(final Map<?, ?> json) {
+    if (json == null) {
+      return null;
+    }
+
+    final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
+    final long length = ((Number) m.get("length")).longValue();
+    final long fileCount = ((Number) m.get("fileCount")).longValue();
+    final long directoryCount = ((Number) m.get("directoryCount")).longValue();
+    final long quota = ((Number) m.get("quota")).longValue();
+    final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
+    final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
+
+    return new ContentSummary.Builder().length(length).fileCount(fileCount).
+        directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
+        spaceQuota(spaceQuota).build();
+  }
+
+  /** Convert a Json map to a MD5MD5CRC32FileChecksum. */
+  static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
+      final Map<?, ?> json) throws IOException {
+    if (json == null) {
+      return null;
+    }
+
+    final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
+    final String algorithm = (String)m.get("algorithm");
+    final int length = ((Number) m.get("length")).intValue();
+    final byte[] bytes = StringUtils.hexStringToByte((String) m.get("bytes"));
+
+    final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+    final DataChecksum.Type crcType =
+        MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
+    final MD5MD5CRC32FileChecksum checksum;
+
+    // Recreate what DFSClient would have returned.
+    switch(crcType) {
+      case CRC32:
+        checksum = new MD5MD5CRC32GzipFileChecksum();
+        break;
+      case CRC32C:
+        checksum = new MD5MD5CRC32CastagnoliFileChecksum();
+        break;
+      default:
+        throw new IOException("Unknown algorithm: " + algorithm);
+    }
+    checksum.readFields(in);
+
+    //check algorithm name
+    if (!checksum.getAlgorithmName().equals(algorithm)) {
+      throw new IOException("Algorithm not matched. Expected " + algorithm
+          + ", Received " + checksum.getAlgorithmName());
+    }
+    //check length
+    if (length != checksum.getLength()) {
+      throw new IOException("Length not matched: length=" + length
+          + ", checksum.getLength()=" + checksum.getLength());
+    }
+
+    return checksum;
+  }
+
+  /** Convert a Json map to a AclStatus object. */
+  static AclStatus toAclStatus(final Map<?, ?> json) {
+    if (json == null) {
+      return null;
+    }
+
+    final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
+
+    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
+    aclStatusBuilder.owner((String) m.get("owner"));
+    aclStatusBuilder.group((String) m.get("group"));
+    aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
+    String permString = (String) m.get("permission");
+    if (permString != null) {
+      final FsPermission permission = toFsPermission(permString,
+          (Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
+      aclStatusBuilder.setPermission(permission);
+    }
+    final List<?> entries = (List<?>) m.get("entries");
+
+    List<AclEntry> aclEntryList = new ArrayList<>();
+    for (Object entry : entries) {
+      AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
+      aclEntryList.add(aclEntry);
+    }
+    aclStatusBuilder.addEntries(aclEntryList);
+    return aclStatusBuilder.build();
+  }
+
+  static byte[] getXAttr(final Map<?, ?> json, final String name)
+      throws IOException {
+    if (json == null) {
+      return null;
+    }
+
+    Map<String, byte[]> xAttrs = toXAttrs(json);
+    if (xAttrs != null) {
+      return xAttrs.get(name);
+    }
+
+    return null;
+  }
+
+  static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
+      throws IOException {
+    if (json == null) {
+      return null;
+    }
+    return toXAttrMap(getList(json, "XAttrs"));
+  }
+
+  static List<String> toXAttrNames(final Map<?, ?> json)
+      throws IOException {
+    if (json == null) {
+      return null;
+    }
+
+    final String namesInJson = (String) json.get("XAttrNames");
+    ObjectReader reader = new ObjectMapper().reader(List.class);
+    final List<Object> xattrs = reader.readValue(namesInJson);
+    final List<String> names =
+      Lists.newArrayListWithCapacity(json.keySet().size());
+
+    for (Object xattr : xattrs) {
+      names.add((String) xattr);
+    }
+    return names;
+  }
+
+  static Map<String, byte[]> toXAttrMap(final List<?> objects)
+      throws IOException {
+    if (objects == null) {
+      return null;
+    } else if (objects.isEmpty()) {
+      return Maps.newHashMap();
+    } else {
+      final Map<String, byte[]> xAttrs = Maps.newHashMap();
+      for (Object object : objects) {
+        Map<?, ?> m = (Map<?, ?>) object;
+        String name = (String) m.get("name");
+        String value = (String) m.get("value");
+        xAttrs.put(name, decodeXAttrValue(value));
+      }
+      return xAttrs;
+    }
+  }
+
+  static byte[] decodeXAttrValue(String value) throws IOException {
+    if (value != null) {
+      return XAttrCodec.decodeValue(value);
+    } else {
+      return new byte[0];
+    }
+  }
+
+  /** Convert a Json map to a Token of DelegationTokenIdentifier. */
+  @SuppressWarnings("unchecked")
+  static Token<DelegationTokenIdentifier> toDelegationToken(
+      final Map<?, ?> json) throws IOException {
+    final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
+    return (Token<DelegationTokenIdentifier>) toToken(m);
+  }
+
+  /** Convert a Json map to LocatedBlock. */
+  static LocatedBlocks toLocatedBlocks(
+      final Map<?, ?> json) throws IOException {
+    if (json == null) {
+      return null;
+    }
+
+    final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
+    final long fileLength = ((Number) m.get("fileLength")).longValue();
+    final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
+    final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
+        getList(m, "locatedBlocks"));
+    final LocatedBlock lastLocatedBlock = toLocatedBlock(
+        (Map<?, ?>) m.get("lastLocatedBlock"));
+    final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
+    return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
+        lastLocatedBlock, isLastBlockComplete, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
new file mode 100644
index 0000000..b8ea951
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
+
+/**
+ * Use UserGroupInformation as a fallback authenticator
+ * if the server does not use Kerberos SPNEGO HTTP authentication.
+ */
+public class KerberosUgiAuthenticator extends KerberosAuthenticator {
+  @Override
+  protected Authenticator getFallBackAuthenticator() {
+    return new PseudoAuthenticator() {
+      @Override
+      protected String getUserName() {
+        try {
+          return UserGroupInformation.getLoginUser().getUserName();
+        } catch (IOException e) {
+          throw new SecurityException("Failed to obtain current username", e);
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
new file mode 100644
index 0000000..4021b3f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SWebHdfsFileSystem extends WebHdfsFileSystem {
+
+  @Override
+  public String getScheme() {
+    return WebHdfsConstants.SWEBHDFS_SCHEME;
+  }
+
+  @Override
+  protected String getTransportScheme() {
+    return "https";
+  }
+
+  @Override
+  protected Text getTokenKind() {
+    return WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
+  }
+
+  @VisibleForTesting
+  @Override
+  public int getDefaultPort() {
+    return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+        HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
new file mode 100644
index 0000000..705ab4c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegationTokenRenewer;
+import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class implements the aspects that relate to delegation tokens for all
+ * HTTP-based file system.
+ */
+final class TokenAspect<T extends FileSystem & Renewable> {
+  @InterfaceAudience.Private
+  public static class TokenManager extends TokenRenewer {
+
+    @Override
+    public void cancel(Token<?> token, Configuration conf) throws IOException {
+      getInstance(token, conf).cancelDelegationToken(token);
+    }
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return kind.equals(WebHdfsConstants.HFTP_TOKEN_KIND)
+          || kind.equals(WebHdfsConstants.HSFTP_TOKEN_KIND)
+          || kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)
+          || kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    @Override
+    public long renew(Token<?> token, Configuration conf) throws IOException {
+      return getInstance(token, conf).renewDelegationToken(token);
+    }
+
+    private TokenManagementDelegator getInstance(Token<?> token,
+                                                 Configuration conf)
+            throws IOException {
+      final URI uri;
+      final String scheme = getSchemeByKind(token.getKind());
+      if (HAUtilClient.isTokenForLogicalUri(token)) {
+        uri = HAUtilClient.getServiceUriFromToken(scheme, token);
+      } else {
+        final InetSocketAddress address = SecurityUtil.getTokenServiceAddr
+                (token);
+        uri = URI.create(scheme + "://" + NetUtils.getHostPortString(address));
+      }
+      return (TokenManagementDelegator) FileSystem.get(uri, conf);
+    }
+
+    private static String getSchemeByKind(Text kind) {
+      if (kind.equals(WebHdfsConstants.HFTP_TOKEN_KIND)) {
+        return WebHdfsConstants.HFTP_SCHEME;
+      } else if (kind.equals(WebHdfsConstants.HSFTP_TOKEN_KIND)) {
+        return WebHdfsConstants.HSFTP_SCHEME;
+      } else if (kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)) {
+        return WebHdfsConstants.WEBHDFS_SCHEME;
+      } else if (kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND)) {
+        return WebHdfsConstants.SWEBHDFS_SCHEME;
+      } else {
+        throw new IllegalArgumentException("Unsupported scheme");
+      }
+    }
+  }
+
+  private static class DTSelecorByKind extends
+      AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
+    public DTSelecorByKind(final Text kind) {
+      super(kind);
+    }
+  }
+
+  /**
+   * Callbacks for token management
+   */
+  interface TokenManagementDelegator {
+    void cancelDelegationToken(final Token<?> token) throws IOException;
+    long renewDelegationToken(final Token<?> token) throws IOException;
+  }
+
+  private DelegationTokenRenewer.RenewAction<?> action;
+  private DelegationTokenRenewer dtRenewer = null;
+  private final DTSelecorByKind dtSelector;
+  private final T fs;
+  private boolean hasInitedToken;
+  private final Log LOG;
+  private final Text serviceName;
+
+  TokenAspect(T fs, final Text serviceName, final Text kind) {
+    this.LOG = LogFactory.getLog(fs.getClass());
+    this.fs = fs;
+    this.dtSelector = new DTSelecorByKind(kind);
+    this.serviceName = serviceName;
+  }
+
+  synchronized void ensureTokenInitialized() throws IOException {
+    // we haven't inited yet, or we used to have a token but it expired
+    if (!hasInitedToken || (action != null && !action.isValid())) {
+      //since we don't already have a token, go get one
+      Token<?> token = fs.getDelegationToken(null);
+      // security might be disabled
+      if (token != null) {
+        fs.setDelegationToken(token);
+        addRenewAction(fs);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Created new DT for " + token.getService());
+        }
+      }
+      hasInitedToken = true;
+    }
+  }
+
+  public synchronized void reset() {
+    hasInitedToken = false;
+  }
+
+  synchronized void initDelegationToken(UserGroupInformation ugi) {
+    Token<?> token = selectDelegationToken(ugi);
+    if (token != null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Found existing DT for " + token.getService());
+      }
+      fs.setDelegationToken(token);
+      hasInitedToken = true;
+    }
+  }
+
+  synchronized void removeRenewAction() throws IOException {
+    if (dtRenewer != null) {
+      dtRenewer.removeRenewAction(fs);
+    }
+  }
+
+  @VisibleForTesting
+  Token<DelegationTokenIdentifier> selectDelegationToken(
+      UserGroupInformation ugi) {
+    return dtSelector.selectToken(serviceName, ugi.getTokens());
+  }
+
+  private synchronized void addRenewAction(final T webhdfs) {
+    if (dtRenewer == null) {
+      dtRenewer = DelegationTokenRenewer.getInstance();
+    }
+
+    action = dtRenewer.addRenewAction(webhdfs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
new file mode 100644
index 0000000..e330adf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utilities for handling URLs
+ */
+@InterfaceAudience.LimitedPrivate({ "HDFS" })
+@InterfaceStability.Unstable
+public class URLConnectionFactory {
+  private static final Log LOG = LogFactory.getLog(URLConnectionFactory.class);
+
+  /**
+   * Timeout for socket connects and reads
+   */
+  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
+  private final ConnectionConfigurator connConfigurator;
+
+  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
+    @Override
+    public HttpURLConnection configure(HttpURLConnection conn)
+        throws IOException {
+      URLConnectionFactory.setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+      return conn;
+    }
+  };
+
+  /**
+   * The URLConnectionFactory that sets the default timeout and it only trusts
+   * Java's SSL certificates.
+   */
+  public static final URLConnectionFactory DEFAULT_SYSTEM_CONNECTION_FACTORY = new URLConnectionFactory(
+      DEFAULT_TIMEOUT_CONN_CONFIGURATOR);
+
+  /**
+   * Construct a new URLConnectionFactory based on the configuration. It will
+   * try to load SSL certificates when it is specified.
+   */
+  public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
+    ConnectionConfigurator conn = null;
+    try {
+      conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+    } catch (Exception e) {
+      LOG.debug(
+          "Cannot load customized ssl related configuration. Fallback to system-generic settings.",
+          e);
+      conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+    }
+    return new URLConnectionFactory(conn);
+  }
+
+  @VisibleForTesting
+  URLConnectionFactory(ConnectionConfigurator connConfigurator) {
+    this.connConfigurator = connConfigurator;
+  }
+
+  /**
+   * Create a new ConnectionConfigurator for SSL connections
+   */
+  private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
+      Configuration conf) throws IOException, GeneralSecurityException {
+    final SSLFactory factory;
+    final SSLSocketFactory sf;
+    final HostnameVerifier hv;
+
+    factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+    factory.init();
+    sf = factory.createSSLSocketFactory();
+    hv = factory.getHostnameVerifier();
+
+    return new ConnectionConfigurator() {
+      @Override
+      public HttpURLConnection configure(HttpURLConnection conn)
+          throws IOException {
+        if (conn instanceof HttpsURLConnection) {
+          HttpsURLConnection c = (HttpsURLConnection) conn;
+          c.setSSLSocketFactory(sf);
+          c.setHostnameVerifier(hv);
+        }
+        URLConnectionFactory.setTimeouts(conn, timeout);
+        return conn;
+      }
+    };
+  }
+
+  /**
+   * Opens a url with read and connect timeouts
+   *
+   * @param url
+   *          to open
+   * @return URLConnection
+   * @throws IOException
+   */
+  public URLConnection openConnection(URL url) throws IOException {
+    try {
+      return openConnection(url, false);
+    } catch (AuthenticationException e) {
+      // Unreachable
+      return null;
+    }
+  }
+
+  /**
+   * Opens a url with read and connect timeouts
+   *
+   * @param url
+   *          URL to open
+   * @param isSpnego
+   *          whether the url should be authenticated via SPNEGO
+   * @return URLConnection
+   * @throws IOException
+   * @throws AuthenticationException
+   */
+  public URLConnection openConnection(URL url, boolean isSpnego)
+      throws IOException, AuthenticationException {
+    if (isSpnego) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("open AuthenticatedURL connection" + url);
+      }
+      UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
+      final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
+      return new AuthenticatedURL(new KerberosUgiAuthenticator(),
+          connConfigurator).openConnection(url, authToken);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("open URL connection");
+      }
+      URLConnection connection = url.openConnection();
+      if (connection instanceof HttpURLConnection) {
+        connConfigurator.configure((HttpURLConnection) connection);
+      }
+      return connection;
+    }
+  }
+
+  /**
+   * Sets timeout parameters on the given URLConnection.
+   *
+   * @param connection
+   *          URLConnection to set
+   * @param socketTimeout
+   *          the connection and read timeout of the connection.
+   */
+  private static void setTimeouts(URLConnection connection, int socketTimeout) {
+    connection.setConnectTimeout(socketTimeout);
+    connection.setReadTimeout(socketTimeout);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsConstants.java
index 50da899..e5c529e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsConstants.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.io.Text;
 
 @InterfaceAudience.Private
 public class WebHdfsConstants {
+  public static final String HFTP_SCHEME = "hftp";
+  public static final Text HFTP_TOKEN_KIND = new Text("HFTP delegation");
+  public static final Text HSFTP_TOKEN_KIND = new Text("HSFTP delegation");
+  public static final String HSFTP_SCHEME = "hsftp";
   public static final String WEBHDFS_SCHEME = "webhdfs";
   public static final String SWEBHDFS_SCHEME = "swebhdfs";
   public static final Text WEBHDFS_TOKEN_KIND = new Text("WEBHDFS delegation");


Mime
View raw message