hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From whe...@apache.org
Subject [3/4] hadoop git commit: HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. Contributed by Haohui Mai.
Date Fri, 24 Apr 2015 00:44:33 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
new file mode 100644
index 0000000..e5d7925
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -0,0 +1,1470 @@
+/**
+res * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.DelegationTokenRenewer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/** A FileSystem for HDFS over the web. */
+public class WebHdfsFileSystem extends FileSystem
+    implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
+  public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
+  /** WebHdfs version. */
+  public static final int VERSION = 1;
+  /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
+  public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME + "/v" + VERSION;
+
+  /** Default connection factory may be overridden in tests to use smaller timeout values */
+  protected URLConnectionFactory connectionFactory;
+
+  @VisibleForTesting
+  public static final String CANT_FALLBACK_TO_INSECURE_MSG =
+      "The client is configured to only allow connecting to secure cluster";
+
+  private boolean canRefreshDelegationToken;
+
+  private UserGroupInformation ugi;
+  private URI uri;
+  private Token<?> delegationToken;
+  protected Text tokenServiceName;
+  private RetryPolicy retryPolicy = null;
+  private Path workingDir;
+  private InetSocketAddress nnAddrs[];
+  private int currentNNAddrIndex;
+  private boolean disallowFallbackToInsecureCluster;
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   * <p/>
+   *
+   * @return <code>webhdfs</code>
+   */
+  @Override
+  public String getScheme() {
+    return WebHdfsConstants.WEBHDFS_SCHEME;
+  }
+
+  /**
+   * return the underlying transport protocol (http / https).
+   */
+  protected String getTransportScheme() {
+    return "http";
+  }
+
+  protected Text getTokenKind() {
+    return WebHdfsConstants.WEBHDFS_TOKEN_KIND;
+  }
+
+  @Override
+  public synchronized void initialize(URI uri, Configuration conf
+      ) throws IOException {
+    super.initialize(uri, conf);
+    setConf(conf);
+    /** set user pattern based on configuration file */
+    UserParam.setUserPattern(conf.get(
+        HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
+        HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
+
+    connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
+
+    ugi = UserGroupInformation.getCurrentUser();
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+    this.nnAddrs = resolveNNAddr();
+
+    boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
+    boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
+    // In non-HA or non-logical URI case, the code needs to call
+    // getCanonicalUri() in order to handle the case where no port is
+    // specified in the URI
+    this.tokenServiceName = isLogicalUri ?
+        HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
+        : SecurityUtil.buildTokenService(getCanonicalUri());
+
+    if (!isHA) {
+      this.retryPolicy =
+          RetryUtils.getDefaultRetryPolicy(
+              conf,
+              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_KEY,
+              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
+              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
+              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
+              HdfsConstantsClient.SAFEMODE_EXCEPTION_CLASS_NAME);
+    } else {
+
+      int maxFailoverAttempts = conf.getInt(
+          HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_KEY,
+          HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_DEFAULT);
+      int maxRetryAttempts = conf.getInt(
+          HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_KEY,
+          HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_DEFAULT);
+      int failoverSleepBaseMillis = conf.getInt(
+          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_KEY,
+          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_DEFAULT);
+      int failoverSleepMaxMillis = conf.getInt(
+          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_KEY,
+          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT);
+
+      this.retryPolicy = RetryPolicies
+          .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+              maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
+              failoverSleepMaxMillis);
+    }
+
+    this.workingDir = getHomeDirectory();
+    this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
+    this.disallowFallbackToInsecureCluster = !conf.getBoolean(
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.delegationToken = null;
+  }
+
+  @Override
+  public URI getCanonicalUri() {
+    return super.getCanonicalUri();
+  }
+
+  /** Is WebHDFS enabled in conf? */
+  public static boolean isEnabled(final Configuration conf, final Log log) {
+    final boolean b = conf.getBoolean(
+        HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
+        HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
+    return b;
+  }
+
+  TokenSelector<DelegationTokenIdentifier> tokenSelector =
+      new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
+
+  // the first getAuthParams() for a non-token op will either get the
+  // internal token from the ugi or lazy fetch one
+  protected synchronized Token<?> getDelegationToken() throws IOException {
+    if (canRefreshDelegationToken && delegationToken == null) {
+      Token<?> token = tokenSelector.selectToken(
+          new Text(getCanonicalServiceName()), ugi.getTokens());
+      // ugi tokens are usually indicative of a task which can't
+      // refetch tokens.  even if ugi has credentials, don't attempt
+      // to get another token to match hdfs/rpc behavior
+      if (token != null) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Using UGI token: " + token);
+        }
+        canRefreshDelegationToken = false;
+      } else {
+        token = getDelegationToken(null);
+        if (token != null) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Fetched new token: " + token);
+          }
+        } else { // security is disabled
+          canRefreshDelegationToken = false;
+        }
+      }
+      setDelegationToken(token);
+    }
+    return delegationToken;
+  }
+
+  @VisibleForTesting
+  synchronized boolean replaceExpiredDelegationToken() throws IOException {
+    boolean replaced = false;
+    if (canRefreshDelegationToken) {
+      Token<?> token = getDelegationToken(null);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Replaced expired token: " + token);
+      }
+      setDelegationToken(token);
+      replaced = (token != null);
+    }
+    return replaced;
+  }
+
+  @Override
+  @VisibleForTesting
+  public int getDefaultPort() {
+    return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+        HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
+  }
+
+  @Override
+  public URI getUri() {
+    return this.uri;
+  }
+
+  @Override
+  protected URI canonicalizeUri(URI uri) {
+    return NetUtils.getCanonicalUri(uri, getDefaultPort());
+  }
+
+  /** @return the home directory. */
+  public static String getHomeDirectoryString(final UserGroupInformation ugi) {
+    return "/user/" + ugi.getShortUserName();
+  }
+
+  @Override
+  public Path getHomeDirectory() {
+    return makeQualified(new Path(getHomeDirectoryString(ugi)));
+  }
+
+  @Override
+  public synchronized Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public synchronized void setWorkingDirectory(final Path dir) {
+    String result = makeAbsolute(dir).toUri().getPath();
+    if (!DFSUtilClient.isValidName(result)) {
+      throw new IllegalArgumentException("Invalid DFS directory name " +
+                                         result);
+    }
+    workingDir = makeAbsolute(dir);
+  }
+
+  private Path makeAbsolute(Path f) {
+    return f.isAbsolute()? f: new Path(workingDir, f);
+  }
+
+  static Map<?, ?> jsonParse(final HttpURLConnection c, final boolean useErrorStream
+      ) throws IOException {
+    if (c.getContentLength() == 0) {
+      return null;
+    }
+    final InputStream in = useErrorStream? c.getErrorStream(): c.getInputStream();
+    if (in == null) {
+      throw new IOException("The " + (useErrorStream? "error": "input") + " stream is null.");
+    }
+    try {
+      final String contentType = c.getContentType();
+      if (contentType != null) {
+        final MediaType parsed = MediaType.valueOf(contentType);
+        if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
+          throw new IOException("Content-Type \"" + contentType
+              + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
+              + "\" (parsed=\"" + parsed + "\")");
+        }
+      }
+      ObjectMapper mapper = new ObjectMapper();
+      return mapper.reader(Map.class).readValue(in);
+    } finally {
+      in.close();
+    }
+  }
+
+  private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
+      final HttpURLConnection conn, boolean unwrapException) throws IOException {
+    final int code = conn.getResponseCode();
+    // server is demanding an authentication we don't support
+    if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
+      // match hdfs/rpc exception
+      throw new AccessControlException(conn.getResponseMessage());
+    }
+    if (code != op.getExpectedHttpResponseCode()) {
+      final Map<?, ?> m;
+      try {
+        m = jsonParse(conn, true);
+      } catch(Exception e) {
+        throw new IOException("Unexpected HTTP response: code=" + code + " != "
+            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+            + ", message=" + conn.getResponseMessage(), e);
+      }
+
+      if (m == null) {
+        throw new IOException("Unexpected HTTP response: code=" + code + " != "
+            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+            + ", message=" + conn.getResponseMessage());
+      } else if (m.get(RemoteException.class.getSimpleName()) == null) {
+        return m;
+      }
+
+      IOException re = JsonUtilClient.toRemoteException(m);
+      // extract UGI-related exceptions and unwrap InvalidToken
+      // the NN mangles these exceptions but the DN does not and may need
+      // to re-fetch a token if either report the token is expired
+      if (re.getMessage() != null && re.getMessage().startsWith(
+          SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) {
+        String[] parts = re.getMessage().split(":\\s+", 3);
+        re = new RemoteException(parts[1], parts[2]);
+        re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
+      }
+      throw unwrapException? toIOException(re): re;
+    }
+    return null;
+  }
+
+  /**
+   * Covert an exception to an IOException.
+   *
+   * For a non-IOException, wrap it with IOException.
+   * For a RemoteException, unwrap it.
+   * For an IOException which is not a RemoteException, return it.
+   */
+  private static IOException toIOException(Exception e) {
+    if (!(e instanceof IOException)) {
+      return new IOException(e);
+    }
+
+    final IOException ioe = (IOException)e;
+    if (!(ioe instanceof RemoteException)) {
+      return ioe;
+    }
+
+    return ((RemoteException)ioe).unwrapRemoteException();
+  }
+
+  private synchronized InetSocketAddress getCurrentNNAddr() {
+    return nnAddrs[currentNNAddrIndex];
+  }
+
+  /**
+   * Reset the appropriate state to gracefully fail over to another name node
+   */
+  private synchronized void resetStateToFailOver() {
+    currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
+  }
+
+  /**
+   * Return a URL pointing to given path on the namenode.
+   *
+   * @param path to obtain the URL for
+   * @param query string to append to the path
+   * @return namenode URL referring to the given path
+   * @throws IOException on error constructing the URL
+   */
+  private URL getNamenodeURL(String path, String query) throws IOException {
+    InetSocketAddress nnAddr = getCurrentNNAddr();
+    final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
+          nnAddr.getPort(), path + '?' + query);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("url=" + url);
+    }
+    return url;
+  }
+
+  Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
+    List<Param<?,?>> authParams = Lists.newArrayList();
+    // Skip adding delegation token for token operations because these
+    // operations require authentication.
+    Token<?> token = null;
+    if (!op.getRequireAuth()) {
+      token = getDelegationToken();
+    }
+    if (token != null) {
+      authParams.add(new DelegationParam(token.encodeToUrlString()));
+    } else {
+      UserGroupInformation userUgi = ugi;
+      UserGroupInformation realUgi = userUgi.getRealUser();
+      if (realUgi != null) { // proxy user
+        authParams.add(new DoAsParam(userUgi.getShortUserName()));
+        userUgi = realUgi;
+      }
+      authParams.add(new UserParam(userUgi.getShortUserName()));
+    }
+    return authParams.toArray(new Param<?,?>[0]);
+  }
+
+  URL toUrl(final HttpOpParam.Op op, final Path fspath,
+      final Param<?,?>... parameters) throws IOException {
+    //initialize URI path and query
+    final String path = PATH_PREFIX
+        + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath());
+    final String query = op.toQueryString()
+        + Param.toSortedString("&", getAuthParameters(op))
+        + Param.toSortedString("&", parameters);
+    final URL url = getNamenodeURL(path, query);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("url=" + url);
+    }
+    return url;
+  }
+
+  /**
+   * This class is for initialing a HTTP connection, connecting to server,
+   * obtaining a response, and also handling retry on failures.
+   */
+  abstract class AbstractRunner<T> {
+    abstract protected URL getUrl() throws IOException;
+
+    protected final HttpOpParam.Op op;
+    private final boolean redirected;
+    protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
+
+    private boolean checkRetry;
+
+    protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
+      this.op = op;
+      this.redirected = redirected;
+    }
+
+    T run() throws IOException {
+      UserGroupInformation connectUgi = ugi.getRealUser();
+      if (connectUgi == null) {
+        connectUgi = ugi;
+      }
+      if (op.getRequireAuth()) {
+        connectUgi.checkTGTAndReloginFromKeytab();
+      }
+      try {
+        // the entire lifecycle of the connection must be run inside the
+        // doAs to ensure authentication is performed correctly
+        return connectUgi.doAs(
+            new PrivilegedExceptionAction<T>() {
+              @Override
+              public T run() throws IOException {
+                return runWithRetry();
+              }
+            });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+
+    /**
+     * Two-step requests redirected to a DN
+     *
+     * Create/Append:
+     * Step 1) Submit a Http request with neither auto-redirect nor data.
+     * Step 2) Submit another Http request with the URL from the Location header with data.
+     *
+     * The reason of having two-step create/append is for preventing clients to
+     * send out the data before the redirect. This issue is addressed by the
+     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+     * and Java 6 http client), which do not correctly implement "Expect:
+     * 100-continue". The two-step create/append is a temporary workaround for
+     * the software library bugs.
+     *
+     * Open/Checksum
+     * Also implements two-step connects for other operations redirected to
+     * a DN such as open and checksum
+     */
+    private HttpURLConnection connect(URL url) throws IOException {
+      //redirect hostname and port
+      String redirectHost = null;
+
+
+      // resolve redirects for a DN operation unless already resolved
+      if (op.getRedirect() && !redirected) {
+        final HttpOpParam.Op redirectOp =
+            HttpOpParam.TemporaryRedirectOp.valueOf(op);
+        final HttpURLConnection conn = connect(redirectOp, url);
+        // application level proxy like httpfs might not issue a redirect
+        if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
+          return conn;
+        }
+        try {
+          validateResponse(redirectOp, conn, false);
+          url = new URL(conn.getHeaderField("Location"));
+          redirectHost = url.getHost() + ":" + url.getPort();
+        } finally {
+          conn.disconnect();
+        }
+      }
+      try {
+        return connect(op, url);
+      } catch (IOException ioe) {
+        if (redirectHost != null) {
+          if (excludeDatanodes.getValue() != null) {
+            excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+                + excludeDatanodes.getValue());
+          } else {
+            excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+          }
+        }
+        throw ioe;
+      }
+    }
+
+    private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
+        throws IOException {
+      final HttpURLConnection conn =
+          (HttpURLConnection)connectionFactory.openConnection(url);
+      final boolean doOutput = op.getDoOutput();
+      conn.setRequestMethod(op.getType().toString());
+      conn.setInstanceFollowRedirects(false);
+      switch (op.getType()) {
+        // if not sending a message body for a POST or PUT operation, need
+        // to ensure the server/proxy knows this
+        case POST:
+        case PUT: {
+          conn.setDoOutput(true);
+          if (!doOutput) {
+            // explicitly setting content-length to 0 won't do spnego!!
+            // opening and closing the stream will send "Content-Length: 0"
+            conn.getOutputStream().close();
+          } else {
+            conn.setRequestProperty("Content-Type",
+                MediaType.APPLICATION_OCTET_STREAM);
+            conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
+          }
+          break;
+        }
+        default: {
+          conn.setDoOutput(doOutput);
+          break;
+        }
+      }
+      conn.connect();
+      return conn;
+    }
+
+    private T runWithRetry() throws IOException {
+      /**
+       * Do the real work.
+       *
+       * There are three cases that the code inside the loop can throw an
+       * IOException:
+       *
+       * <ul>
+       * <li>The connection has failed (e.g., ConnectException,
+       * @see FailoverOnNetworkExceptionRetry for more details)</li>
+       * <li>The namenode enters the standby state (i.e., StandbyException).</li>
+       * <li>The server returns errors for the command (i.e., RemoteException)</li>
+       * </ul>
+       *
+       * The call to shouldRetry() will conduct the retry policy. The policy
+       * examines the exception and swallows it if it decides to rerun the work.
+       */
+      for(int retry = 0; ; retry++) {
+        checkRetry = !redirected;
+        final URL url = getUrl();
+        try {
+          final HttpURLConnection conn = connect(url);
+          // output streams will validate on close
+          if (!op.getDoOutput()) {
+            validateResponse(op, conn, false);
+          }
+          return getResponse(conn);
+        } catch (AccessControlException ace) {
+          // no retries for auth failures
+          throw ace;
+        } catch (InvalidToken it) {
+          // try to replace the expired token with a new one.  the attempt
+          // to acquire a new token must be outside this operation's retry
+          // so if it fails after its own retries, this operation fails too.
+          if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
+            throw it;
+          }
+        } catch (IOException ioe) {
+          shouldRetry(ioe, retry);
+        }
+      }
+    }
+
+    private void shouldRetry(final IOException ioe, final int retry
+        ) throws IOException {
+      InetSocketAddress nnAddr = getCurrentNNAddr();
+      if (checkRetry) {
+        try {
+          final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
+              ioe, retry, 0, true);
+
+          boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+          boolean isFailoverAndRetry =
+              a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+
+          if (isRetry || isFailoverAndRetry) {
+            LOG.info("Retrying connect to namenode: " + nnAddr
+                + ". Already tried " + retry + " time(s); retry policy is "
+                + retryPolicy + ", delay " + a.delayMillis + "ms.");
+
+            if (isFailoverAndRetry) {
+              resetStateToFailOver();
+            }
+
+            Thread.sleep(a.delayMillis);
+            return;
+          }
+        } catch(Exception e) {
+          LOG.warn("Original exception is ", ioe);
+          throw toIOException(e);
+        }
+      }
+      throw toIOException(ioe);
+    }
+
+    abstract T getResponse(HttpURLConnection conn) throws IOException;
+  }
+
+  /**
+   * Abstract base class to handle path-based operations with params
+   */
+  abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
+    private final Path fspath;
+    private final Param<?,?>[] parameters;
+
+    AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
+        Param<?,?>... parameters) {
+      super(op, false);
+      this.fspath = fspath;
+      this.parameters = parameters;
+    }
+
+    AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+        final Path fspath) {
+      super(op, false);
+      this.fspath = fspath;
+      this.parameters = parameters;
+    }
+
+    @Override
+    protected URL getUrl() throws IOException {
+      if (excludeDatanodes.getValue() != null) {
+        Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
+        System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+        tmpParam[parameters.length] = excludeDatanodes;
+        return toUrl(op, fspath, tmpParam);
+      } else {
+        return toUrl(op, fspath, parameters);
+      }
+    }
+  }
+
+  /**
+   * Default path-based implementation expects no json response
+   */
+  class FsPathRunner extends AbstractFsPathRunner<Void> {
+    FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+
+    @Override
+    Void getResponse(HttpURLConnection conn) throws IOException {
+      return null;
+    }
+  }
+
+  /**
+   * Handle path-based operations with a json response
+   */
+  abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
+    FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
+        Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+
+    FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+        final Path fspath) {
+      super(op, parameters, fspath);
+    }
+
+    @Override
+    final T getResponse(HttpURLConnection conn) throws IOException {
+      try {
+        final Map<?,?> json = jsonParse(conn, false);
+        if (json == null) {
+          // match exception class thrown by parser
+          throw new IllegalStateException("Missing response");
+        }
+        return decodeResponse(json);
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) { // catch json parser errors
+        final IOException ioe =
+            new IOException("Response decoding failure: "+e.toString(), e);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(ioe);
+        }
+        throw ioe;
+      } finally {
+        conn.disconnect();
+      }
+    }
+
+    abstract T decodeResponse(Map<?,?> json) throws IOException;
+  }
+
+  /**
+   * Handle path-based operations with json boolean response
+   */
+  class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
+    FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+
+    @Override
+    Boolean decodeResponse(Map<?,?> json) throws IOException {
+      return (Boolean)json.get("boolean");
+    }
+  }
+
+  /**
+   * Handle create/append output streams
+   */
+  class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
+    private final int bufferSize;
+
+    FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
+        Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+      this.bufferSize = bufferSize;
+    }
+
+    @Override
+    FSDataOutputStream getResponse(final HttpURLConnection conn)
+        throws IOException {
+      return new FSDataOutputStream(new BufferedOutputStream(
+          conn.getOutputStream(), bufferSize), statistics) {
+        @Override
+        public void close() throws IOException {
+          try {
+            super.close();
+          } finally {
+            try {
+              validateResponse(op, conn, true);
+            } finally {
+              conn.disconnect();
+            }
+          }
+        }
+      };
+    }
+  }
+
+  class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
+    FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    @Override
+    HttpURLConnection getResponse(final HttpURLConnection conn)
+        throws IOException {
+      return conn;
+    }
+  }
+
+  /**
+   * Used by open() which tracks the resolved url itself
+   */
+  final class URLRunner extends AbstractRunner<HttpURLConnection> {
+    private final URL url;
+    @Override
+    protected URL getUrl() {
+      return url;
+    }
+
+    protected URLRunner(final HttpOpParam.Op op, final URL url, boolean redirected) {
+      super(op, redirected);
+      this.url = url;
+    }
+
+    @Override
+    HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
+      return conn;
+    }
+  }
+
+  private FsPermission applyUMask(FsPermission permission) {
+    if (permission == null) {
+      permission = FsPermission.getDefault();
+    }
+    return permission.applyUMask(FsPermission.getUMask(getConf()));
+  }
+
+  private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
+    HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
+      @Override
+      HdfsFileStatus decodeResponse(Map<?,?> json) {
+        return JsonUtilClient.toFileStatus(json, true);
+      }
+    }.run();
+    if (status == null) {
+      throw new FileNotFoundException("File does not exist: " + f);
+    }
+    return status;
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    return makeQualified(getHdfsFileStatus(f), f);
+  }
+
+  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
+    return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
+        f.getBlockSize(), f.getModificationTime(), f.getAccessTime(),
+        f.getPermission(), f.getOwner(), f.getGroup(),
+        f.isSymlink() ? new Path(f.getSymlink()) : null,
+        f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory()));
+  }
+
+  @Override
+  public AclStatus getAclStatus(Path f) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
+    AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
+      @Override
+      AclStatus decodeResponse(Map<?,?> json) {
+        return JsonUtilClient.toAclStatus(json);
+      }
+    }.run();
+    if (status == null) {
+      throw new FileNotFoundException("File does not exist: " + f);
+    }
+    return status;
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
+    return new FsPathBooleanRunner(op, f,
+        new PermissionParam(applyUMask(permission))
+    ).run();
+  }
+
+  /**
+   * Create a symlink pointing to the destination path.
+   */
+  public void createSymlink(Path destination, Path f, boolean createParent
+      ) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
+    new FsPathRunner(op, f,
+        new DestinationParam(makeQualified(destination).toUri().getPath()),
+        new CreateParentParam(createParent)
+    ).run();
+  }
+
+  @Override
+  public boolean rename(final Path src, final Path dst) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
+    return new FsPathBooleanRunner(op, src,
+        new DestinationParam(makeQualified(dst).toUri().getPath())
+    ).run();
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public void rename(final Path src, final Path dst,
+      final Options.Rename... options) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
+    new FsPathRunner(op, src,
+        new DestinationParam(makeQualified(dst).toUri().getPath()),
+        new RenameOptionSetParam(options)
+    ).run();
+  }
+
+  @Override
+  public void setXAttr(Path p, String name, byte[] value,
+      EnumSet<XAttrSetFlag> flag) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
+    if (value != null) {
+      new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
+          XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
+          new XAttrSetFlagParam(flag)).run();
+    } else {
+      new FsPathRunner(op, p, new XAttrNameParam(name),
+          new XAttrSetFlagParam(flag)).run();
+    }
+  }
+
+  @Override
+  public byte[] getXAttr(Path p, final String name) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+    return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
+        new XAttrEncodingParam(XAttrCodec.HEX)) {
+      @Override
+      byte[] decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtilClient.getXAttr(json, name);
+      }
+    }.run();
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(Path p) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+    return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
+        new XAttrEncodingParam(XAttrCodec.HEX)) {
+      @Override
+      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtilClient.toXAttrs(json);
+      }
+    }.run();
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
+      throws IOException {
+    Preconditions.checkArgument(names != null && !names.isEmpty(),
+        "XAttr names cannot be null or empty.");
+    Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
+    for (int i = 0; i < parameters.length - 1; i++) {
+      parameters[i] = new XAttrNameParam(names.get(i));
+    }
+    parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
+
+    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+    return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
+      @Override
+      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtilClient.toXAttrs(json);
+      }
+    }.run();
+  }
+
+  @Override
+  public List<String> listXAttrs(Path p) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
+    return new FsPathResponseRunner<List<String>>(op, p) {
+      @Override
+      List<String> decodeResponse(Map<?, ?> json) throws IOException {
+        return JsonUtilClient.toXAttrNames(json);
+      }
+    }.run();
+  }
+
+  @Override
+  public void removeXAttr(Path p, String name) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
+    new FsPathRunner(op, p, new XAttrNameParam(name)).run();
+  }
+
+  @Override
+  public void setOwner(final Path p, final String owner, final String group
+      ) throws IOException {
+    if (owner == null && group == null) {
+      throw new IOException("owner == null && group == null");
+    }
+
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
+    new FsPathRunner(op, p,
+        new OwnerParam(owner), new GroupParam(group)
+    ).run();
+  }
+
+  @Override
+  public void setPermission(final Path p, final FsPermission permission
+      ) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
+    new FsPathRunner(op, p,new PermissionParam(permission)).run();
+  }
+
+  @Override
+  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
+    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
+  }
+
+  @Override
+  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
+    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
+  }
+
+  @Override
+  public void removeDefaultAcl(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
+    new FsPathRunner(op, path).run();
+  }
+
+  @Override
+  public void removeAcl(Path path) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
+    new FsPathRunner(op, path).run();
+  }
+
+  @Override
+  public void setAcl(final Path p, final List<AclEntry> aclSpec)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETACL;
+    new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
+  }
+
+  @Override
+  public Path createSnapshot(final Path path, final String snapshotName)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
+    Path spath = new FsPathResponseRunner<Path>(op, path,
+        new SnapshotNameParam(snapshotName)) {
+      @Override
+      Path decodeResponse(Map<?,?> json) {
+        return new Path((String) json.get(Path.class.getSimpleName()));
+      }
+    }.run();
+    return spath;
+  }
+
+  @Override
+  public void deleteSnapshot(final Path path, final String snapshotName)
+      throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
+    new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
+  }
+
+  @Override
+  public void renameSnapshot(final Path path, final String snapshotOldName,
+      final String snapshotNewName) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
+    new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
+        new SnapshotNameParam(snapshotNewName)).run();
+  }
+
+  @Override
+  public boolean setReplication(final Path p, final short replication
+     ) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
+    return new FsPathBooleanRunner(op, p,
+        new ReplicationParam(replication)
+    ).run();
+  }
+
+  @Override
+  public void setTimes(final Path p, final long mtime, final long atime
+      ) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
+    new FsPathRunner(op, p,
+        new ModificationTimeParam(mtime),
+        new AccessTimeParam(atime)
+    ).run();
+  }
+
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,
+        HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,
+        HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT);
+  }
+
+  @Override
+  public void concat(final Path trg, final Path [] srcs) throws IOException {
+    statistics.incrementWriteOps(1);
+    final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
+    new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
+  }
+
+  @Override
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+      final boolean overwrite, final int bufferSize, final short replication,
+      final long blockSize, final Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+
+    final HttpOpParam.Op op = PutOpParam.Op.CREATE;
+    return new FsPathOutputStreamRunner(op, f, bufferSize,
+        new PermissionParam(applyUMask(permission)),
+        new OverwriteParam(overwrite),
+        new BufferSizeParam(bufferSize),
+        new ReplicationParam(replication),
+        new BlockSizeParam(blockSize)
+    ).run();
+  }
+
+  @Override
+  public FSDataOutputStream append(final Path f, final int bufferSize,
+      final Progressable progress) throws IOException {
+    statistics.incrementWriteOps(1);
+
+    final HttpOpParam.Op op = PostOpParam.Op.APPEND;
+    return new FsPathOutputStreamRunner(op, f, bufferSize,
+        new BufferSizeParam(bufferSize)
+    ).run();
+  }
+
+  @Override
+  public boolean truncate(Path f, long newLength) throws IOException {
+    statistics.incrementWriteOps(1);
+
+    final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
+    return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
+    return new FsPathBooleanRunner(op, f,
+        new RecursiveParam(recursive)
+    ).run();
+  }
+
+  @Override
+  public FSDataInputStream open(final Path f, final int buffersize
+      ) throws IOException {
+    statistics.incrementReadOps(1);
+    final HttpOpParam.Op op = GetOpParam.Op.OPEN;
+    // use a runner so the open can recover from an invalid token
+    FsPathConnectionRunner runner =
+        new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
+    return new FSDataInputStream(new OffsetUrlInputStream(
+        new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    try {
+      if (canRefreshDelegationToken && delegationToken != null) {
+        cancelDelegationToken(delegationToken);
+      }
+    } catch (IOException ioe) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Token cancel failed: " + ioe);
+      }
+    } finally {
+      super.close();
+    }
+  }
+
+  // use FsPathConnectionRunner to ensure retries for InvalidTokens
+  class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
+    private final FsPathConnectionRunner runner;
+    UnresolvedUrlOpener(FsPathConnectionRunner runner) {
+      super(null);
+      this.runner = runner;
+    }
+
+    @Override
+    protected HttpURLConnection connect(long offset, boolean resolved)
+        throws IOException {
+      assert offset == 0;
+      HttpURLConnection conn = runner.run();
+      setURL(conn.getURL());
+      return conn;
+    }
+  }
+
+  class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
+    OffsetUrlOpener(final URL url) {
+      super(url);
+    }
+
+    /** Setup offset url and connect. */
+    @Override
+    protected HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException {
+      final URL offsetUrl = offset == 0L? url
+          : new URL(url + "&" + new OffsetParam(offset));
+      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
+    }
+  }
+
+  private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
+
+  /** Remove offset parameter, if there is any, from the url */
+  static URL removeOffsetParam(final URL url) throws MalformedURLException {
+    String query = url.getQuery();
+    if (query == null) {
+      return url;
+    }
+    final String lower = StringUtils.toLowerCase(query);
+    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
+        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
+      return url;
+    }
+
+    //rebuild query
+    StringBuilder b = null;
+    for(final StringTokenizer st = new StringTokenizer(query, "&");
+        st.hasMoreTokens();) {
+      final String token = st.nextToken();
+      if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) {
+        if (b == null) {
+          b = new StringBuilder("?").append(token);
+        } else {
+          b.append('&').append(token);
+        }
+      }
+    }
+    query = b == null? "": b.toString();
+
+    final String urlStr = url.toString();
+    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
+  }
+
+  static class OffsetUrlInputStream extends ByteRangeInputStream {
+    OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
+        throws IOException {
+      super(o, r);
+    }
+
+    /** Remove offset parameter before returning the resolved url. */
+    @Override
+    protected URL getResolvedUrl(final HttpURLConnection connection
+        ) throws MalformedURLException {
+      return removeOffsetParam(connection.getURL());
+    }
+  }
+
+  @Override
+  public FileStatus[] listStatus(final Path f) throws IOException {
+    statistics.incrementReadOps(1);
+
+    final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
+    return new FsPathResponseRunner<FileStatus[]>(op, f) {
+      @Override
+      FileStatus[] decodeResponse(Map<?,?> json) {
+        final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
+        final List<?> array = JsonUtilClient.getList(rootmap,
+                                                     FileStatus.class.getSimpleName());
+
+        //convert FileStatus
+        final FileStatus[] statuses = new FileStatus[array.size()];
+        int i = 0;
+        for (Object object : array) {
+          final Map<?, ?> m = (Map<?, ?>) object;
+          statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false), f);
+        }
+        return statuses;
+      }
+    }.run();
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(
+      final String renewer) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
+    Token<DelegationTokenIdentifier> token =
+        new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
+            op, null, new RenewerParam(renewer)) {
+      @Override
+      Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
+          throws IOException {
+        return JsonUtilClient.toDelegationToken(json);
+      }
+    }.run();
+    if (token != null) {
+      token.setService(tokenServiceName);
+    } else {
+      if (disallowFallbackToInsecureCluster) {
+        throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
+      }
+    }
+    return token;
+  }
+
+  @Override
+  public synchronized Token<?> getRenewToken() {
+    return delegationToken;
+  }
+
+  @Override
+  public <T extends TokenIdentifier> void setDelegationToken(
+      final Token<T> token) {
+    synchronized (this) {
+      delegationToken = token;
+    }
+  }
+
+  @Override
+  public synchronized long renewDelegationToken(final Token<?> token
+      ) throws IOException {
+    final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
+    return new FsPathResponseRunner<Long>(op, null,
+        new TokenArgumentParam(token.encodeToUrlString())) {
+      @Override
+      Long decodeResponse(Map<?,?> json) throws IOException {
+        return ((Number) json.get("long")).longValue();
+      }
+    }.run();
+  }
+
+  @Override
+  public synchronized void cancelDelegationToken(final Token<?> token
+      ) throws IOException {
+    final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
+    new FsPathRunner(op, null,
+        new TokenArgumentParam(token.encodeToUrlString())
+    ).run();
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final FileStatus status,
+      final long offset, final long length) throws IOException {
+    if (status == null) {
+      return null;
+    }
+    return getFileBlockLocations(status.getPath(), offset, length);
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(final Path p,
+      final long offset, final long length) throws IOException {
+    statistics.incrementReadOps(1);
+
+    final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
+    return new FsPathResponseRunner<BlockLocation[]>(op, p,
+        new OffsetParam(offset), new LengthParam(length)) {
+      @Override
+      BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
+        return DFSUtilClient.locatedBlocks2Locations(
+            JsonUtilClient.toLocatedBlocks(json));
+      }
+    }.run();
+  }
+
+  @Override
+  public void access(final Path path, final FsAction mode) throws IOException {
+    final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
+    new FsPathRunner(op, path, new FsActionParam(mode)).run();
+  }
+
+  @Override
+  public ContentSummary getContentSummary(final Path p) throws IOException {
+    statistics.incrementReadOps(1);
+
+    final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
+    return new FsPathResponseRunner<ContentSummary>(op, p) {
+      @Override
+      ContentSummary decodeResponse(Map<?,?> json) {
+        return JsonUtilClient.toContentSummary(json);
+      }
+    }.run();
+  }
+
+  @Override
+  public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
+      ) throws IOException {
+    statistics.incrementReadOps(1);
+
+    final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
+    return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
+      @Override
+      MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
+        return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
+      }
+    }.run();
+  }
+
+  /**
+   * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
+   * resolver when the URL points to an non-HA cluster. When the URL points to
+   * an HA cluster with its logical name, the resolver further resolves the
+   * logical name(i.e., the authority in the URL) into real namenode addresses.
+   */
+  private InetSocketAddress[] resolveNNAddr() throws IOException {
+    Configuration conf = getConf();
+    final String scheme = uri.getScheme();
+
+    ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+
+    if (!HAUtilClient.isLogicalUri(conf, uri)) {
+      InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
+          getDefaultPort());
+      ret.add(addr);
+
+    } else {
+      Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
+          .getHaNnWebHdfsAddresses(conf, scheme);
+
+      // Extract the entry corresponding to the logical name.
+      Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
+      for (InetSocketAddress addr : addrs.values()) {
+        ret.add(addr);
+      }
+    }
+
+    InetSocketAddress[] r = new InetSocketAddress[ret.size()];
+    return ret.toArray(r);
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    return tokenServiceName == null ? super.getCanonicalServiceName()
+        : tokenServiceName.toString();
+  }
+
+  @VisibleForTesting
+  InetSocketAddress[] getResolvedNNAddr() {
+    return nnAddrs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java
new file mode 100644
index 0000000..376d7d8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+
+/** Buffer size parameter. */
+public class BufferSizeParam extends IntegerParam {
+  /** Parameter name. */
+  public static final String NAME = "buffersize";
+  /** Default parameter value. */
+  public static final String DEFAULT = NULL;
+
+  private static final Domain DOMAIN = new Domain(NAME);
+
+  /**
+   * Constructor.
+   * @param value the parameter value.
+   */
+  public BufferSizeParam(final Integer value) {
+    super(DOMAIN, value, 1, null);
+  }
+
+  /**
+   * Constructor.
+   * @param str a string representation of the parameter value.
+   */
+  public BufferSizeParam(final String str) {
+    this(DOMAIN.parse(str));
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /** @return the value or, if it is null, return the default from conf. */
+  public int getValue(final Configuration conf) {
+    return getValue() != null? getValue()
+        : conf.getInt(
+            CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+            CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
index 4a3d2b1..cc2912c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.lib.wsrs.EnumSetParam;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -369,7 +370,7 @@ public class HttpFSFileSystem extends FileSystem
    */
   @Override
   protected int getDefaultPort() {
-    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+    return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
         DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 813ce78..ecc10be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -149,6 +149,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8215. Refactor NamenodeFsck#check method.  (Takanobu Asanuma
     via szetszwo)
 
+    HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. (wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
index c809017..a1cd555 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.token.Token;
-import org.apache.htrace.Sampler;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -296,7 +295,7 @@ class BlockStorageLocationUtil {
       List<LocatedBlock> blocks, 
       Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
     // Construct the final return value of VolumeBlockLocation[]
-    BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+    BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
     List<BlockStorageLocation> volumeBlockLocs = 
         new ArrayList<BlockStorageLocation>(locations.length);
     for (int i = 0; i < locations.length; i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 351c7ea..b241815 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -919,7 +919,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     TraceScope scope = getPathTraceScope("getBlockLocations", src);
     try {
       LocatedBlocks blocks = getLocatedBlocks(src, start, length);
-      BlockLocation[] locations =  DFSUtil.locatedBlocks2Locations(blocks);
+      BlockLocation[] locations =  DFSUtilClient.locatedBlocks2Locations(blocks);
       HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
       for (int i = 0; i < locations.length; i++) {
         hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0075bc2..18b0c1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -102,7 +102,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
     "dfs.namenode.path.based.cache.block.map.allocation.percent";
   public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
 
-  public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT =
       HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_KEY =
@@ -163,9 +162,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
    */
   public static final String  DFS_WEBHDFS_AUTHENTICATION_FILTER_DEFAULT =
       "org.apache.hadoop.hdfs.web.AuthFilter".toString();
-  public static final String  DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
-  public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = true;
-  public static final String  DFS_WEBHDFS_USER_PATTERN_KEY = "dfs.webhdfs.user.provider.user.pattern";
+  @Deprecated
+  public static final String  DFS_WEBHDFS_USER_PATTERN_KEY =
+      HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY;
+  @Deprecated
   public static final String  DFS_WEBHDFS_USER_PATTERN_DEFAULT =
       HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT;
   public static final String  DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
@@ -305,7 +305,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   //Following keys have no defaults
   public static final String  DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
-  public static final String  DFS_NAMENODE_HTTPS_PORT_KEY = "dfs.https.port";
   public static final int     DFS_NAMENODE_HTTPS_PORT_DEFAULT =
       HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
   public static final String  DFS_NAMENODE_HTTPS_ADDRESS_KEY =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 600f15f..12d5ad0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -229,37 +229,7 @@ public class DFSUtil {
    * names which contain a ":" or "//", or other non-canonical paths.
    */
   public static boolean isValidName(String src) {
-    // Path must be absolute.
-    if (!src.startsWith(Path.SEPARATOR)) {
-      return false;
-    }
-      
-    // Check for ".." "." ":" "/"
-    String[] components = StringUtils.split(src, '/');
-    for (int i = 0; i < components.length; i++) {
-      String element = components[i];
-      if (element.equals(".")  ||
-          (element.indexOf(":") >= 0)  ||
-          (element.indexOf("/") >= 0)) {
-        return false;
-      }
-      // ".." is allowed in path starting with /.reserved/.inodes
-      if (element.equals("..")) {
-        if (components.length > 4
-            && components[1].equals(FSDirectory.DOT_RESERVED_STRING)
-            && components[2].equals(FSDirectory.DOT_INODES_STRING)) {
-          continue;
-        }
-        return false;
-      }
-      // The string may start or end with a /, but not have
-      // "//" in the middle.
-      if (element.isEmpty() && i != components.length - 1 &&
-          i != 0) {
-        return false;
-      }
-    }
-    return true;
+    return DFSUtilClient.isValidName(src);
   }
 
   /**
@@ -330,7 +300,7 @@ public class DFSUtil {
    * Converts a string to a byte array using UTF8 encoding.
    */
   public static byte[] string2Bytes(String str) {
-    return str.getBytes(Charsets.UTF_8);
+    return DFSUtilClient.string2Bytes(str);
   }
 
   /**
@@ -476,61 +446,6 @@ public class DFSUtil {
     }
     return result;
   }
-  
-  /**
-   * Convert a LocatedBlocks to BlockLocations[]
-   * @param blocks a LocatedBlocks
-   * @return an array of BlockLocations
-   */
-  public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
-    if (blocks == null) {
-      return new BlockLocation[0];
-    }
-    return locatedBlocks2Locations(blocks.getLocatedBlocks());
-  }
-  
-  /**
-   * Convert a List<LocatedBlock> to BlockLocation[]
-   * @param blocks A List<LocatedBlock> to be converted
-   * @return converted array of BlockLocation
-   */
-  public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
-    if (blocks == null) {
-      return new BlockLocation[0];
-    }
-    int nrBlocks = blocks.size();
-    BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
-    if (nrBlocks == 0) {
-      return blkLocations;
-    }
-    int idx = 0;
-    for (LocatedBlock blk : blocks) {
-      assert idx < nrBlocks : "Incorrect index";
-      DatanodeInfo[] locations = blk.getLocations();
-      String[] hosts = new String[locations.length];
-      String[] xferAddrs = new String[locations.length];
-      String[] racks = new String[locations.length];
-      for (int hCnt = 0; hCnt < locations.length; hCnt++) {
-        hosts[hCnt] = locations[hCnt].getHostName();
-        xferAddrs[hCnt] = locations[hCnt].getXferAddr();
-        NodeBase node = new NodeBase(xferAddrs[hCnt],
-                                     locations[hCnt].getNetworkLocation());
-        racks[hCnt] = node.toString();
-      }
-      DatanodeInfo[] cachedLocations = blk.getCachedLocations();
-      String[] cachedHosts = new String[cachedLocations.length];
-      for (int i=0; i<cachedLocations.length; i++) {
-        cachedHosts[i] = cachedLocations[i].getHostName();
-      }
-      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
-                                            racks,
-                                            blk.getStartOffset(),
-                                            blk.getBlockSize(),
-                                            blk.isCorrupt());
-      idx++;
-    }
-    return blkLocations;
-  }
 
   /**
    * Return configuration key of format key.suffix1.suffix2...suffixN

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index bc559f8..913baea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -404,7 +404,7 @@ public class NameNodeProxies {
             HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT, 
             HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
             HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
-            SafeModeException.class);
+            SafeModeException.class.getName());
     
     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
index 7e602bf..23e8f57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 
 /** 
  * Interface that represents the over the wire information
@@ -78,6 +78,6 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
         isSymlink() ? new Path(getSymlink()) : null,
         (getFullPath(path)).makeQualified(
             defaultUri, null), // fully-qualify path
-        DFSUtil.locatedBlocks2Locations(getBlockLocations()));
+        DFSUtilClient.locatedBlocks2Locations(getBlockLocations()));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
index 9a46811..72f1a35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
index 4264472..40daa24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
@@ -46,8 +46,6 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
 import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
-import org.apache.hadoop.hdfs.web.HftpFileSystem;
-import org.apache.hadoop.hdfs.web.HsftpFileSystem;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -243,7 +241,7 @@ public class DelegationTokenFetcher {
       dis = new DataInputStream(in);
       ts.readFields(dis);
       for (Token<?> token : ts.getAllTokens()) {
-        token.setKind(isHttps ? HsftpFileSystem.TOKEN_KIND : HftpFileSystem.TOKEN_KIND);
+        token.setKind(isHttps ? WebHdfsConstants.HSFTP_TOKEN_KIND : WebHdfsConstants.HFTP_TOKEN_KIND);
         SecurityUtil.setTokenService(token, serviceAddr);
       }
       return ts;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
deleted file mode 100644
index 9e3b29a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.web;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.http.HttpStatus;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.net.HttpHeaders;
-
-/**
- * To support HTTP byte streams, a new connection to an HTTP server needs to be
- * created each time. This class hides the complexity of those multiple
- * connections from the client. Whenever seek() is called, a new connection
- * is made on the successive read(). The normal input stream functions are
- * connected to the currently active input stream.
- */
-public abstract class ByteRangeInputStream extends FSInputStream {
-
-  /**
-   * This class wraps a URL and provides method to open connection.
-   * It can be overridden to change how a connection is opened.
-   */
-  public static abstract class URLOpener {
-    protected URL url;
-
-    public URLOpener(URL u) {
-      url = u;
-    }
-
-    public void setURL(URL u) {
-      url = u;
-    }
-
-    public URL getURL() {
-      return url;
-    }
-
-    /** Connect to server with a data offset. */
-    protected abstract HttpURLConnection connect(final long offset,
-        final boolean resolved) throws IOException;
-  }
-
-  enum StreamStatus {
-    NORMAL, SEEK, CLOSED
-  }
-  protected InputStream in;
-  protected final URLOpener originalURL;
-  protected final URLOpener resolvedURL;
-  protected long startPos = 0;
-  protected long currentPos = 0;
-  protected Long fileLength = null;
-
-  StreamStatus status = StreamStatus.SEEK;
-
-  /**
-   * Create with the specified URLOpeners. Original url is used to open the
-   * stream for the first time. Resolved url is used in subsequent requests.
-   * @param o Original url
-   * @param r Resolved url
-   */
-  public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
-    this.originalURL = o;
-    this.resolvedURL = r;
-    getInputStream();
-  }
-
-  protected abstract URL getResolvedUrl(final HttpURLConnection connection
-      ) throws IOException;
-
-  @VisibleForTesting
-  protected InputStream getInputStream() throws IOException {
-    switch (status) {
-      case NORMAL:
-        break;
-      case SEEK:
-        if (in != null) {
-          in.close();
-        }
-        in = openInputStream();
-        status = StreamStatus.NORMAL;
-        break;
-      case CLOSED:
-        throw new IOException("Stream closed");
-    }
-    return in;
-  }
-
-  @VisibleForTesting
-  protected InputStream openInputStream() throws IOException {
-    // Use the original url if no resolved url exists, eg. if
-    // it's the first time a request is made.
-    final boolean resolved = resolvedURL.getURL() != null;
-    final URLOpener opener = resolved? resolvedURL: originalURL;
-
-    final HttpURLConnection connection = opener.connect(startPos, resolved);
-    resolvedURL.setURL(getResolvedUrl(connection));
-
-    InputStream in = connection.getInputStream();
-    final Map<String, List<String>> headers = connection.getHeaderFields();
-    if (isChunkedTransferEncoding(headers)) {
-      // file length is not known
-      fileLength = null;
-    } else {
-      // for non-chunked transfer-encoding, get content-length
-      long streamlength = getStreamLength(connection, headers);
-      fileLength = startPos + streamlength;
-
-      // Java has a bug with >2GB request streams.  It won't bounds check
-      // the reads so the transfer blocks until the server times out
-      in = new BoundedInputStream(in, streamlength);
-    }
-
-    return in;
-  }
-
-  private static long getStreamLength(HttpURLConnection connection,
-      Map<String, List<String>> headers) throws IOException {
-    String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
-    if (cl == null) {
-      // Try to get the content length by parsing the content range
-      // because HftpFileSystem does not return the content length
-      // if the content is partial.
-      if (connection.getResponseCode() == HttpStatus.SC_PARTIAL_CONTENT) {
-        cl = connection.getHeaderField(HttpHeaders.CONTENT_RANGE);
-        return getLengthFromRange(cl);
-      } else {
-        throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
-            + headers);
-      }
-    }
-    return Long.parseLong(cl);
-  }
-
-  private static long getLengthFromRange(String cl) throws IOException {
-    try {
-
-      String[] str = cl.substring(6).split("[-/]");
-      return Long.parseLong(str[1]) - Long.parseLong(str[0]) + 1;
-    } catch (Exception e) {
-      throw new IOException(
-          "failed to get content length by parsing the content range: " + cl
-              + " " + e.getMessage());
-    }
-  }
-
-  private static boolean isChunkedTransferEncoding(
-      final Map<String, List<String>> headers) {
-    return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
-        || contains(headers, HttpHeaders.TE, "chunked");
-  }
-
-  /** Does the HTTP header map contain the given key, value pair? */
-  private static boolean contains(final Map<String, List<String>> headers,
-      final String key, final String value) {
-    final List<String> values = headers.get(key);
-    if (values != null) {
-      for(String v : values) {
-        for(final StringTokenizer t = new StringTokenizer(v, ",");
-            t.hasMoreTokens(); ) {
-          if (value.equalsIgnoreCase(t.nextToken())) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
-  }
-
-  private int update(final int n) throws IOException {
-    if (n != -1) {
-      currentPos += n;
-    } else if (fileLength != null && currentPos < fileLength) {
-      throw new IOException("Got EOF but currentPos = " + currentPos
-          + " < filelength = " + fileLength);
-    }
-    return n;
-  }
-
-  @Override
-  public int read() throws IOException {
-    final int b = getInputStream().read();
-    update((b == -1) ? -1 : 1);
-    return b;
-  }
-
-  @Override
-  public int read(byte b[], int off, int len) throws IOException {
-    return update(getInputStream().read(b, off, len));
-  }
-
-  /**
-   * Seek to the given offset from the start of the file.
-   * The next read() will be from that location.  Can't
-   * seek past the end of the file.
-   */
-  @Override
-  public void seek(long pos) throws IOException {
-    if (pos != currentPos) {
-      startPos = pos;
-      currentPos = pos;
-      if (status != StreamStatus.CLOSED) {
-        status = StreamStatus.SEEK;
-      }
-    }
-  }
-
-  /**
-   * Return the current offset from the start of the file
-   */
-  @Override
-  public long getPos() throws IOException {
-    return currentPos;
-  }
-
-  /**
-   * Seeks a different copy of the data.  Returns true if
-   * found a new source, false otherwise.
-   */
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return false;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (in != null) {
-      in.close();
-      in = null;
-    }
-    status = StreamStatus.CLOSED;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
index 581f088..14c619d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
@@ -79,7 +80,6 @@ import org.xml.sax.helpers.XMLReaderFactory;
 @InterfaceStability.Evolving
 public class HftpFileSystem extends FileSystem
     implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
-  public static final String SCHEME = "hftp";
 
   static {
     HttpURLConnection.setFollowRedirects(true);
@@ -87,8 +87,6 @@ public class HftpFileSystem extends FileSystem
 
   URLConnectionFactory connectionFactory;
 
-  public static final Text TOKEN_KIND = new Text("HFTP delegation");
-
   protected UserGroupInformation ugi;
   private URI hftpURI;
 
@@ -123,7 +121,7 @@ public class HftpFileSystem extends FileSystem
 
   @Override
   protected int getDefaultPort() {
-    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+    return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
         DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
   }
 
@@ -168,7 +166,7 @@ public class HftpFileSystem extends FileSystem
    */
   @Override
   public String getScheme() {
-    return SCHEME;
+    return WebHdfsConstants.HFTP_SCHEME;
   }
 
   /**
@@ -176,7 +174,7 @@ public class HftpFileSystem extends FileSystem
    * be overridden by HsFtpFileSystem.
    */
   protected void initTokenAspect() {
-    tokenAspect = new TokenAspect<HftpFileSystem>(this, tokenServiceName, TOKEN_KIND);
+    tokenAspect = new TokenAspect<HftpFileSystem>(this, tokenServiceName, WebHdfsConstants.HFTP_TOKEN_KIND);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b64bb2b9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
index b232f5b..1e9e96a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.web;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 
 /**
  * An implementation of a protocol for accessing filesystems over HTTPS. The
@@ -34,8 +34,6 @@ import org.apache.hadoop.io.Text;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class HsftpFileSystem extends HftpFileSystem {
-  public static final Text TOKEN_KIND = new Text("HSFTP delegation");
-  public static final String SCHEME = "hsftp";
 
   /**
    * Return the protocol scheme for the FileSystem.
@@ -45,7 +43,7 @@ public class HsftpFileSystem extends HftpFileSystem {
    */
   @Override
   public String getScheme() {
-    return SCHEME;
+    return WebHdfsConstants.HSFTP_SCHEME;
   }
 
   /**
@@ -59,12 +57,12 @@ public class HsftpFileSystem extends HftpFileSystem {
   @Override
   protected void initTokenAspect() {
     tokenAspect = new TokenAspect<HsftpFileSystem>(this, tokenServiceName,
-        TOKEN_KIND);
+        WebHdfsConstants.HSFTP_TOKEN_KIND);
   }
 
   @Override
   protected int getDefaultPort() {
-    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+    return getConf().getInt(HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
                             DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
   }
 }


Mime
View raw message