Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DAE14D366 for ; Wed, 1 Aug 2012 01:42:11 +0000 (UTC) Received: (qmail 11772 invoked by uid 500); 1 Aug 2012 01:42:11 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 11738 invoked by uid 500); 1 Aug 2012 01:42:11 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 11727 invoked by uid 99); 1 Aug 2012 01:42:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Aug 2012 01:42:11 +0000 X-ASF-Spam-Status: No, hits=-1998.0 required=5.0 tests=ALL_TRUSTED,FB_GET_MEDS X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Aug 2012 01:42:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 32D52238890D; Wed, 1 Aug 2012 01:41:25 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1367841 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/web/ src/main/java/org/apache/hadoop/hdfs/web/resources/ src/test/java/org/apache/hadoop/hdfs/... Date: Wed, 01 Aug 2012 01:41:24 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120801014125.32D52238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Wed Aug 1 01:41:23 2012 New Revision: 1367841 URL: http://svn.apache.org/viewvc?rev=1367841&view=rev Log: HDFS-3667. Add retry support to WebHdfsFileSystem. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Aug 1 01:41:23 2012 @@ -360,6 +360,8 @@ Branch-2 ( Unreleased changes ) HDFS-3650. Use MutableQuantiles to provide latency histograms for various operations. (Andrew Wang via atm) + HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo) + OPTIMIZATIONS HDFS-2982. Startup performance suffers when there are many edit log Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Wed Aug 1 01:41:23 2012 @@ -57,9 +57,9 @@ public abstract class ByteRangeInputStre return url; } - protected abstract HttpURLConnection openConnection() throws IOException; - - protected abstract HttpURLConnection openConnection(final long offset) throws IOException; + /** Connect to server with a data offset. */ + protected abstract HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException; } enum StreamStatus { @@ -85,9 +85,6 @@ public abstract class ByteRangeInputStre this.resolvedURL = r; } - protected abstract void checkResponseCode(final HttpURLConnection connection - ) throws IOException; - protected abstract URL getResolvedUrl(final HttpURLConnection connection ) throws IOException; @@ -113,13 +110,10 @@ public abstract class ByteRangeInputStre protected InputStream openInputStream() throws IOException { // Use the original url if no resolved url exists, eg. if // it's the first time a request is made. - final URLOpener opener = - (resolvedURL.getURL() == null) ? originalURL : resolvedURL; - - final HttpURLConnection connection = opener.openConnection(startPos); - connection.connect(); - checkResponseCode(connection); + final boolean resolved = resolvedURL.getURL() != null; + final URLOpener opener = resolved? resolvedURL: originalURL; + final HttpURLConnection connection = opener.connect(startPos, resolved); final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH); if (cl == null) { throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing"); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Aug 1 01:41:23 2012 @@ -342,19 +342,28 @@ public class HftpFileSystem extends File super(url); } - @Override protected HttpURLConnection openConnection() throws IOException { return (HttpURLConnection)URLUtils.openConnection(url); } /** Use HTTP Range header for specifying offset. */ @Override - protected HttpURLConnection openConnection(final long offset) throws IOException { + protected HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException { final HttpURLConnection conn = openConnection(); conn.setRequestMethod("GET"); if (offset != 0L) { conn.setRequestProperty("Range", "bytes=" + offset + "-"); } + conn.connect(); + + //Expects HTTP_OK or HTTP_PARTIAL response codes. + final int code = conn.getResponseCode(); + if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) { + throw new IOException("HTTP_PARTIAL expected, received " + code); + } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) { + throw new IOException("HTTP_OK expected, received " + code); + } return conn; } } @@ -368,22 +377,6 @@ public class HftpFileSystem extends File this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null)); } - /** Expects HTTP_OK and HTTP_PARTIAL response codes. */ - @Override - protected void checkResponseCode(final HttpURLConnection connection - ) throws IOException { - final int code = connection.getResponseCode(); - if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) { - // We asked for a byte range but did not receive a partial content - // response... - throw new IOException("HTTP_PARTIAL expected, received " + code); - } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) { - // We asked for all bytes from the beginning but didn't receive a 200 - // response (none of the other 2xx codes are valid here) - throw new IOException("HTTP_OK expected, received " + code); - } - } - @Override protected URL getResolvedUrl(final HttpURLConnection connection) { return connection.getURL(); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Wed Aug 1 01:41:23 2012 @@ -259,7 +259,7 @@ public class NameNodeProxies { * * Note that dfs.client.retry.max < 0 is not allowed. */ - private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) { + public static RetryPolicy getDefaultRetryPolicy(Configuration conf) { final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf); if (LOG.isDebugEnabled()) { LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry); @@ -300,6 +300,13 @@ public class NameNodeProxies { + p.getClass().getSimpleName() + ", exception=" + e); return p.shouldRetry(e, retries, failovers, isMethodIdempotent); } + + @Override + public String toString() { + return "RetryPolicy[" + multipleLinearRandomRetry + ", " + + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() + + "]"; + } }; } } @@ -335,7 +342,7 @@ public class NameNodeProxies { boolean withRetries) throws IOException { RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); - final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf); + final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf); final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Aug 1 01:41:23 2012 @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.hdfs.ByteRangeInputStream; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; @@ -88,6 +89,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; @@ -147,6 +149,7 @@ public class WebHdfsFileSystem extends F private URI uri; private Token delegationToken; private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); + private RetryPolicy retryPolicy = null; private Path workingDir; { @@ -179,6 +182,7 @@ public class WebHdfsFileSystem extends F throw new IllegalArgumentException(e); } this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort()); + this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf); this.workingDir = getHomeDirectory(); if (UserGroupInformation.isSecurityEnabled()) { @@ -276,39 +280,64 @@ public class WebHdfsFileSystem extends F } private static Map validateResponse(final HttpOpParam.Op op, - final HttpURLConnection conn) throws IOException { + final HttpURLConnection conn, boolean unwrapException) throws IOException { final int code = conn.getResponseCode(); if (code != op.getExpectedHttpResponseCode()) { final Map m; try { m = jsonParse(conn, true); - } catch(IOException e) { + } catch(Exception e) { throw new IOException("Unexpected HTTP response: code=" + code + " != " + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() + ", message=" + conn.getResponseMessage(), e); } - if (m.get(RemoteException.class.getSimpleName()) == null) { + if (m == null) { + throw new IOException("Unexpected HTTP response: code=" + code + " != " + + op.getExpectedHttpResponseCode() + ", " + op.toQueryString() + + ", message=" + conn.getResponseMessage()); + } else if (m.get(RemoteException.class.getSimpleName()) == null) { return m; } final RemoteException re = JsonUtil.toRemoteException(m); - throw re.unwrapRemoteException(AccessControlException.class, - InvalidToken.class, - AuthenticationException.class, - AuthorizationException.class, - FileAlreadyExistsException.class, - FileNotFoundException.class, - ParentNotDirectoryException.class, - UnresolvedPathException.class, - SafeModeException.class, - DSQuotaExceededException.class, - NSQuotaExceededException.class); + throw unwrapException? toIOException(re): re; } return null; } /** + * Covert an exception to an IOException. + * + * For a non-IOException, wrap it with IOException. + * For a RemoteException, unwrap it. + * For an IOException which is not a RemoteException, return it. + */ + private static IOException toIOException(Exception e) { + if (!(e instanceof IOException)) { + return new IOException(e); + } + + final IOException ioe = (IOException)e; + if (!(ioe instanceof RemoteException)) { + return ioe; + } + + final RemoteException re = (RemoteException)ioe; + return re.unwrapRemoteException(AccessControlException.class, + InvalidToken.class, + AuthenticationException.class, + AuthorizationException.class, + FileAlreadyExistsException.class, + FileNotFoundException.class, + ParentNotDirectoryException.class, + UnresolvedPathException.class, + SafeModeException.class, + DSQuotaExceededException.class, + NSQuotaExceededException.class); + } + + /** * Return a URL pointing to given path on the namenode. * * @param path to obtain the URL for @@ -362,70 +391,15 @@ public class WebHdfsFileSystem extends F } private HttpURLConnection getHttpUrlConnection(URL url) - throws IOException { + throws IOException, AuthenticationException { final HttpURLConnection conn; - try { - if (ugi.hasKerberosCredentials()) { - conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); - } else { - conn = (HttpURLConnection)url.openConnection(); - } - } catch (AuthenticationException e) { - throw new IOException("Authentication failed, url=" + url, e); + if (ugi.hasKerberosCredentials()) { + conn = new AuthenticatedURL(AUTH).openConnection(url, authToken); + } else { + conn = (HttpURLConnection)url.openConnection(); } return conn; } - - private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath, - final Param... parameters) throws IOException { - final URL url = toUrl(op, fspath, parameters); - - //connect and get response - HttpURLConnection conn = getHttpUrlConnection(url); - try { - conn.setRequestMethod(op.getType().toString()); - if (op.getDoOutput()) { - conn = twoStepWrite(conn, op); - conn.setRequestProperty("Content-Type", "application/octet-stream"); - } - conn.setDoOutput(op.getDoOutput()); - conn.connect(); - return conn; - } catch (IOException e) { - conn.disconnect(); - throw e; - } - } - - /** - * Two-step Create/Append: - * Step 1) Submit a Http request with neither auto-redirect nor data. - * Step 2) Submit another Http request with the URL from the Location header with data. - * - * The reason of having two-step create/append is for preventing clients to - * send out the data before the redirect. This issue is addressed by the - * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. - * Unfortunately, there are software library bugs (e.g. Jetty 6 http server - * and Java 6 http client), which do not correctly implement "Expect: - * 100-continue". The two-step create/append is a temporary workaround for - * the software library bugs. - */ - static HttpURLConnection twoStepWrite(HttpURLConnection conn, - final HttpOpParam.Op op) throws IOException { - //Step 1) Submit a Http request with neither auto-redirect nor data. - conn.setInstanceFollowRedirects(false); - conn.setDoOutput(false); - conn.connect(); - validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn); - final String redirect = conn.getHeaderField("Location"); - conn.disconnect(); - - //Step 2) Submit another Http request with the URL from the Location header with data. - conn = (HttpURLConnection)new URL(redirect).openConnection(); - conn.setRequestMethod(op.getType().toString()); - conn.setChunkedStreamingMode(32 << 10); //32kB-chunk - return conn; - } /** * Run a http operation. @@ -439,12 +413,161 @@ public class WebHdfsFileSystem extends F */ private Map run(final HttpOpParam.Op op, final Path fspath, final Param... parameters) throws IOException { - final HttpURLConnection conn = httpConnect(op, fspath, parameters); - try { - final Map m = validateResponse(op, conn); - return m != null? m: jsonParse(conn, false); - } finally { - conn.disconnect(); + return new Runner(op, fspath, parameters).run().json; + } + + /** + * This class is for initialing a HTTP connection, connecting to server, + * obtaining a response, and also handling retry on failures. + */ + class Runner { + private final HttpOpParam.Op op; + private final URL url; + private final boolean redirected; + + private boolean checkRetry; + private HttpURLConnection conn = null; + private Map json = null; + + Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) { + this.op = op; + this.url = url; + this.redirected = redirected; + } + + Runner(final HttpOpParam.Op op, final Path fspath, + final Param... parameters) throws IOException { + this(op, toUrl(op, fspath, parameters), false); + } + + Runner(final HttpOpParam.Op op, final HttpURLConnection conn) { + this(op, null, false); + this.conn = conn; + } + + private void init() throws IOException { + checkRetry = !redirected; + try { + conn = getHttpUrlConnection(url); + } catch(AuthenticationException ae) { + checkRetry = false; + throw new IOException("Authentication failed, url=" + url, ae); + } + } + + private void connect() throws IOException { + connect(op.getDoOutput()); + } + + private void connect(boolean doOutput) throws IOException { + conn.setRequestMethod(op.getType().toString()); + conn.setDoOutput(doOutput); + conn.setInstanceFollowRedirects(false); + conn.connect(); + } + + private void disconnect() { + if (conn != null) { + conn.disconnect(); + conn = null; + } + } + + Runner run() throws IOException { + for(int retry = 0; ; retry++) { + try { + init(); + if (op.getDoOutput()) { + twoStepWrite(); + } else { + getResponse(op != GetOpParam.Op.OPEN); + } + return this; + } catch(IOException ioe) { + shouldRetry(ioe, retry); + } + } + } + + private void shouldRetry(final IOException ioe, final int retry + ) throws IOException { + if (checkRetry) { + try { + final RetryPolicy.RetryAction a = retryPolicy.shouldRetry( + ioe, retry, 0, true); + if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) { + LOG.info("Retrying connect to namenode: " + nnAddr + + ". Already tried " + retry + " time(s); retry policy is " + + retryPolicy + ", delay " + a.delayMillis + "ms."); + Thread.sleep(a.delayMillis); + return; + } + } catch(Exception e) { + LOG.warn("Original exception is ", ioe); + throw toIOException(e); + } + } + throw toIOException(ioe); + } + + /** + * Two-step Create/Append: + * Step 1) Submit a Http request with neither auto-redirect nor data. + * Step 2) Submit another Http request with the URL from the Location header with data. + * + * The reason of having two-step create/append is for preventing clients to + * send out the data before the redirect. This issue is addressed by the + * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3. + * Unfortunately, there are software library bugs (e.g. Jetty 6 http server + * and Java 6 http client), which do not correctly implement "Expect: + * 100-continue". The two-step create/append is a temporary workaround for + * the software library bugs. + */ + HttpURLConnection twoStepWrite() throws IOException { + //Step 1) Submit a Http request with neither auto-redirect nor data. + connect(false); + validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false); + final String redirect = conn.getHeaderField("Location"); + disconnect(); + checkRetry = false; + + //Step 2) Submit another Http request with the URL from the Location header with data. + conn = (HttpURLConnection)new URL(redirect).openConnection(); + conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM); + conn.setChunkedStreamingMode(32 << 10); //32kB-chunk + connect(); + return conn; + } + + FSDataOutputStream write(final int bufferSize) throws IOException { + return WebHdfsFileSystem.this.write(op, conn, bufferSize); + } + + void getResponse(boolean getJsonAndDisconnect) throws IOException { + try { + connect(); + final int code = conn.getResponseCode(); + if (!redirected && op.getRedirect() + && code != op.getExpectedHttpResponseCode()) { + final String redirect = conn.getHeaderField("Location"); + json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), + conn, false); + disconnect(); + + checkRetry = false; + conn = (HttpURLConnection)new URL(redirect).openConnection(); + connect(); + } + + json = validateResponse(op, conn, false); + if (json == null && getJsonAndDisconnect) { + json = jsonParse(conn, false); + } + } finally { + if (getJsonAndDisconnect) { + disconnect(); + } + } } } @@ -578,7 +701,7 @@ public class WebHdfsFileSystem extends F super.close(); } finally { try { - validateResponse(op, conn); + validateResponse(op, conn, true); } finally { conn.disconnect(); } @@ -594,13 +717,14 @@ public class WebHdfsFileSystem extends F statistics.incrementWriteOps(1); final HttpOpParam.Op op = PutOpParam.Op.CREATE; - final HttpURLConnection conn = httpConnect(op, f, + return new Runner(op, f, new PermissionParam(applyUMask(permission)), new OverwriteParam(overwrite), new BufferSizeParam(bufferSize), new ReplicationParam(replication), - new BlockSizeParam(blockSize)); - return write(op, conn, bufferSize); + new BlockSizeParam(blockSize)) + .run() + .write(bufferSize); } @Override @@ -609,9 +733,9 @@ public class WebHdfsFileSystem extends F statistics.incrementWriteOps(1); final HttpOpParam.Op op = PostOpParam.Op.APPEND; - final HttpURLConnection conn = httpConnect(op, f, - new BufferSizeParam(bufferSize)); - return write(op, conn, bufferSize); + return new Runner(op, f, new BufferSizeParam(bufferSize)) + .run() + .write(bufferSize); } @SuppressWarnings("deprecation") @@ -638,26 +762,17 @@ public class WebHdfsFileSystem extends F } class OffsetUrlOpener extends ByteRangeInputStream.URLOpener { - /** The url with offset parameter */ - private URL offsetUrl; - OffsetUrlOpener(final URL url) { super(url); } - /** Open connection with offset url. */ - @Override - protected HttpURLConnection openConnection() throws IOException { - return getHttpUrlConnection(offsetUrl); - } - - /** Setup offset url before open connection. */ + /** Setup offset url and connect. */ @Override - protected HttpURLConnection openConnection(final long offset) throws IOException { - offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset)); - final HttpURLConnection conn = openConnection(); - conn.setRequestMethod("GET"); - return conn; + protected HttpURLConnection connect(final long offset, + final boolean resolved) throws IOException { + final URL offsetUrl = offset == 0L? url + : new URL(url + "&" + new OffsetParam(offset)); + return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn; } } @@ -698,12 +813,6 @@ public class WebHdfsFileSystem extends F OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) { super(o, r); } - - @Override - protected void checkResponseCode(final HttpURLConnection connection - ) throws IOException { - validateResponse(GetOpParam.Op.OPEN, connection); - } /** Remove offset parameter before returning the resolved url. */ @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java Wed Aug 1 01:41:23 2012 @@ -44,6 +44,11 @@ public class DeleteOpParam extends HttpO } @Override + public boolean getRedirect() { + return false; + } + + @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java Wed Aug 1 01:41:23 2012 @@ -23,25 +23,27 @@ import java.net.HttpURLConnection; public class GetOpParam extends HttpOpParam { /** Get operations. */ public static enum Op implements HttpOpParam.Op { - OPEN(HttpURLConnection.HTTP_OK), + OPEN(true, HttpURLConnection.HTTP_OK), - GETFILESTATUS(HttpURLConnection.HTTP_OK), - LISTSTATUS(HttpURLConnection.HTTP_OK), - GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK), - GETFILECHECKSUM(HttpURLConnection.HTTP_OK), - - GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK), - GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK), - GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK), + GETFILESTATUS(false, HttpURLConnection.HTTP_OK), + LISTSTATUS(false, HttpURLConnection.HTTP_OK), + GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK), + GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK), + + GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK), + GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK), /** GET_BLOCK_LOCATIONS is a private unstable op. */ - GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK), + GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK), - NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED); + NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED); + final boolean redirect; final int expectedHttpResponseCode; - Op(final int expectedHttpResponseCode) { + Op(final boolean redirect, final int expectedHttpResponseCode) { + this.redirect = redirect; this.expectedHttpResponseCode = expectedHttpResponseCode; } @@ -56,6 +58,11 @@ public class GetOpParam extends HttpOpPa } @Override + public boolean getRedirect() { + return redirect; + } + + @Override public int getExpectedHttpResponseCode() { return expectedHttpResponseCode; } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Wed Aug 1 01:41:23 2012 @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hdfs.web.resources; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import javax.ws.rs.core.Response; @@ -42,6 +46,9 @@ public abstract class HttpOpParam values + = Collections.unmodifiableList(Arrays.asList( + new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM})); + /** Get an object for the given op. */ public static TemporaryRedirectOp valueOf(final Op op) { - if (op == CREATE.op) { - return CREATE; - } else if (op == APPEND.op) { - return APPEND; + for(TemporaryRedirectOp t : values) { + if (op == t.op) { + return t; + } } throw new IllegalArgumentException(op + " not found."); } @@ -80,6 +97,11 @@ public abstract class HttpOpParam exceptions = new ArrayList(); final Path dir = new Path("/testNamenodeRestart"); - final Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); final short numDatanodes = 3; @@ -841,16 +848,18 @@ public class TestDFSClientRetries { try { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); + final FileSystem fs = isWebHDFS? + WebHdfsTestUtil.getWebHdfsFileSystem(conf): dfs; final URI uri = dfs.getUri(); assertTrue(HdfsUtils.isHealthy(uri)); //create a file final long length = 1L << 20; final Path file1 = new Path(dir, "foo"); - DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L); + DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L); //get file status - final FileStatus s1 = dfs.getFileStatus(file1); + final FileStatus s1 = fs.getFileStatus(file1); assertEquals(length, s1.getLen()); //shutdown namenode @@ -858,6 +867,25 @@ public class TestDFSClientRetries { cluster.shutdownNameNode(0); assertFalse(HdfsUtils.isHealthy(uri)); + //namenode is down, read the file in a thread + final Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + //it should retry till namenode is up. + final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS); + final FSDataInputStream in = fs.open(file1); + int count = 0; + for(; in.read() != -1; count++); + in.close(); + assertEquals(s1.getLen(), count); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + reader.start(); + //namenode is down, create another file in a thread final Path file3 = new Path(dir, "file"); final Thread thread = new Thread(new Runnable() { @@ -865,7 +893,7 @@ public class TestDFSClientRetries { public void run() { try { //it should retry till namenode is up. - final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf); + final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS); DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L); } catch (Exception e) { exceptions.add(e); @@ -892,12 +920,15 @@ public class TestDFSClientRetries { }).start(); //namenode is down, it should retry until namenode is up again. - final FileStatus s2 = dfs.getFileStatus(file1); + final FileStatus s2 = fs.getFileStatus(file1); assertEquals(s1, s2); //check file1 and file3 thread.join(); - assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3)); + assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen()); + assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3)); + + reader.join(); //enter safe mode assertTrue(HdfsUtils.isHealthy(uri)); @@ -922,8 +953,8 @@ public class TestDFSClientRetries { //namenode is in safe mode, create should retry until it leaves safe mode. final Path file2 = new Path(dir, "bar"); - DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L); - assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2)); + DFSTestUtil.createFile(fs, file2, length, numDatanodes, 20120406L); + assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file2)); assertTrue(HdfsUtils.isHealthy(uri)); @@ -931,7 +962,7 @@ public class TestDFSClientRetries { final Path nonExisting = new Path(dir, "nonExisting"); LOG.info("setPermission: " + nonExisting); try { - dfs.setPermission(nonExisting, new FsPermission((short)0)); + fs.setPermission(nonExisting, new FsPermission((short)0)); fail(); } catch(FileNotFoundException fnfe) { LOG.info("GOOD!", fnfe); @@ -949,6 +980,18 @@ public class TestDFSClientRetries { } } + private static FileSystem createFsWithDifferentUsername( + final Configuration conf, final boolean isWebHDFS + ) throws IOException, InterruptedException { + final String username = UserGroupInformation.getCurrentUser( + ).getShortUserName() + "_XXX"; + final UserGroupInformation ugi = UserGroupInformation.createUserForTesting( + username, new String[]{"supergroup"}); + + return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf) + : DFSTestUtil.getFileSystemAs(ugi, conf); + } + @Test public void testMultipleLinearRandomRetry() { parseMultipleLinearRandomRetry(null, ""); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Wed Aug 1 01:41:23 2012 @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -140,9 +139,7 @@ public class TestDelegationTokenForProxy .doAs(new PrivilegedExceptionAction>() { @Override public Token run() throws IOException { - DistributedFileSystem dfs = (DistributedFileSystem) cluster - .getFileSystem(); - return dfs.getDelegationToken("RenewerUser"); + return cluster.getFileSystem().getDelegationToken("RenewerUser"); } }); DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); @@ -206,7 +203,7 @@ public class TestDelegationTokenForProxy final PutOpParam.Op op = PutOpParam.Op.CREATE; final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(conn, op); + conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); out.write("Hello, webhdfs user!".getBytes()); out.close(); @@ -221,7 +218,7 @@ public class TestDelegationTokenForProxy final PostOpParam.Op op = PostOpParam.Op.APPEND; final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER)); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn = WebHdfsTestUtil.twoStepWrite(conn, op); + conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn); final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096); out.write("\nHello again!".getBytes()); out.close(); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java Wed Aug 1 01:41:23 2012 @@ -18,22 +18,10 @@ package org.apache.hadoop.hdfs.web; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import java.io.IOException; -import java.net.URI; import java.net.URL; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream; -import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener; import org.junit.Test; public class TestOffsetUrlInputStream { @@ -73,65 +61,4 @@ public class TestOffsetUrlInputStream { WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString()); } } - - @Test - public void testByteRange() throws Exception { - final Configuration conf = new Configuration(); - final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/"; - final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf); - - OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/"))); - doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) - .openConnection(); - OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null)); - doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) - .openConnection(); - final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy); - - assertEquals("getPos wrong", 0, is.getPos()); - - is.read(); - - assertNull("Initial call made incorrectly (Range Check)", ospy - .openConnection().getRequestProperty("Range")); - - assertEquals("getPos should be 1 after reading one byte", 1, is.getPos()); - - is.read(); - - assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos()); - - // No additional connections should have been made (no seek) - - rspy.setURL(new URL("http://resolvedurl/")); - - is.seek(100); - is.read(); - - assertEquals("getPos should be 101 after reading one byte", 101, - is.getPos()); - - verify(rspy, times(1)).openConnection(); - - is.seek(101); - is.read(); - - verify(rspy, times(1)).openConnection(); - - // Seek to 101 should not result in another request" - - is.seek(2500); - is.read(); - - ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206); - is.seek(0); - - try { - is.read(); - fail("Exception should be thrown when 206 response is given " - + "but 200 is expected"); - } catch (IOException e) { - WebHdfsFileSystem.LOG.info(e.toString()); - } - } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java Wed Aug 1 01:41:23 2012 @@ -23,12 +23,16 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.TestDFSClientRetries; +import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; +import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; @@ -196,4 +200,12 @@ public class TestWebHDFS { in.close(); t.end(checked); } + + /** Test client retry with namenode restarting. */ + @Test + public void testNamenodeRestart() throws Exception { + ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL); + final Configuration conf = WebHdfsTestUtil.createConf(); + TestDFSClientRetries.namenodeRestartTest(conf, true); + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1367841&r1=1367840&r2=1367841&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Wed Aug 1 01:41:23 2012 @@ -79,13 +79,9 @@ public class WebHdfsTestUtil { return WebHdfsFileSystem.jsonParse(conn, false); } - public static HttpURLConnection twoStepWrite(HttpURLConnection conn, - final HttpOpParam.Op op) throws IOException { - conn.setRequestMethod(op.getType().toString()); - conn = WebHdfsFileSystem.twoStepWrite(conn, op); - conn.setDoOutput(true); - conn.connect(); - return conn; + public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs, + final HttpOpParam.Op op, HttpURLConnection conn) throws IOException { + return webhdfs.new Runner(op, conn).twoStepWrite(); } public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,