Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91CAE10C50 for ; Thu, 24 Oct 2013 23:59:13 +0000 (UTC) Received: (qmail 2101 invoked by uid 500); 24 Oct 2013 23:59:13 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 2060 invoked by uid 500); 24 Oct 2013 23:59:13 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 2052 invoked by uid 99); 24 Oct 2013 23:59:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Oct 2013 23:59:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Oct 2013 23:59:09 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 48A8523888E4; Thu, 24 Oct 2013 23:58:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1535589 - in /hadoop/common/branches/branch-2.2/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ Date: Thu, 24 Oct 2013 23:58:46 -0000 To: hdfs-commits@hadoop.apache.org From: brandonli@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131024235847.48A8523888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: brandonli Date: Thu Oct 24 23:58:46 2013 New Revision: 1535589 URL: http://svn.apache.org/r1535589 Log: HDFS-5171. Merging change r1535588 from branch-2 Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java?rev=1535589&r1=1535588&r2=1535589&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java Thu Oct 24 23:58:46 2013 @@ -20,15 +20,19 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -41,15 +45,52 @@ import com.google.common.cache.RemovalNo class DFSClientCache { private static final Log LOG = LogFactory.getLog(DFSClientCache.class); /** - * Cache that maps User id to corresponding DFSClient. + * Cache that maps User id to the corresponding DFSClient. */ @VisibleForTesting final LoadingCache clientCache; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; + /** + * Cache that maps to the corresponding + * FSDataInputStream. + */ + final LoadingCache inputstreamCache; + + /** + * Time to live for a DFSClient (in seconds) + */ + final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024; + final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60; + private final Configuration config; + private static class DFSInputStreamCaheKey { + final String userId; + final String inodePath; + + private DFSInputStreamCaheKey(String userId, String inodePath) { + super(); + this.userId = userId; + this.inodePath = inodePath; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DFSInputStreamCaheKey) { + DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj; + return userId.equals(k.userId) && inodePath.equals(k.inodePath); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(userId, inodePath); + } + } + DFSClientCache(Configuration config) { this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); } @@ -60,6 +101,12 @@ class DFSClientCache { .maximumSize(clientCache) .removalListener(clientRemovealListener()) .build(clientLoader()); + + this.inputstreamCache = CacheBuilder.newBuilder() + .maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE) + .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) + .removalListener(inputStreamRemovalListener()) + .build(inputStreamLoader()); } private CacheLoader clientLoader() { @@ -95,7 +142,33 @@ class DFSClientCache { }; } - DFSClient get(String userName) { + private RemovalListener inputStreamRemovalListener() { + return new RemovalListener() { + + @Override + public void onRemoval( + RemovalNotification notification) { + try { + notification.getValue().close(); + } catch (IOException e) { + } + } + }; + } + + private CacheLoader inputStreamLoader() { + return new CacheLoader() { + + @Override + public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { + DFSClient client = getDfsClient(key.userId); + DFSInputStream dis = client.open(key.inodePath); + return new FSDataInputStream(dis); + } + }; + } + + DFSClient getDfsClient(String userName) { DFSClient client = null; try { client = clientCache.get(userName); @@ -105,4 +178,21 @@ class DFSClientCache { } return client; } + + FSDataInputStream getDfsInputStream(String userName, String inodePath) { + DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + FSDataInputStream s = null; + try { + s = inputstreamCache.get(k); + } catch (ExecutionException e) { + LOG.warn("Failed to create DFSInputStream for user:" + userName + + " Cause:" + e); + } + return s; + } + + public void invalidateDfsInputStream(String userName, String inodePath) { + DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + inputstreamCache.invalidate(k); + } } Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1535589&r1=1535588&r2=1535589&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Oct 24 23:58:46 2013 @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -235,7 +234,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -310,7 +309,7 @@ public class RpcProgramNfs3 extends RpcP public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, InetAddress client) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -392,7 +391,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -454,7 +453,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -502,7 +501,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -563,13 +562,14 @@ public class RpcProgramNfs3 extends RpcP public READ3Response read(XDR xdr, SecurityHandler securityHandler, InetAddress client) { READ3Response response = new READ3Response(Nfs3Status.NFS3_OK); + final String userName = securityHandler.getUser(); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { response.setStatus(Nfs3Status.NFS3ERR_ACCES); return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(userName); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -628,11 +628,28 @@ public class RpcProgramNfs3 extends RpcP int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count); byte[] readbuffer = new byte[buffSize]; - DFSInputStream is = dfsClient.open(Nfs3Utils.getFileIdPath(handle)); - FSDataInputStream fis = new FSDataInputStream(is); - - int readCount = fis.read(offset, readbuffer, 0, count); - fis.close(); + int readCount = 0; + /** + * Retry exactly once because the DFSInputStream can be stale. + */ + for (int i = 0; i < 1; ++i) { + FSDataInputStream fis = clientCache.getDfsInputStream(userName, + Nfs3Utils.getFileIdPath(handle)); + + try { + readCount = fis.read(offset, readbuffer, 0, count); + } catch (IOException e) { + // TODO: A cleaner way is to throw a new type of exception + // which requires incompatible changes. + if (e.getMessage() == "Stream closed") { + clientCache.invalidateDfsInputStream(userName, + Nfs3Utils.getFileIdPath(handle)); + continue; + } else { + throw e; + } + } + } attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle), iug); @@ -660,7 +677,7 @@ public class RpcProgramNfs3 extends RpcP SecurityHandler securityHandler, InetAddress client) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -735,7 +752,7 @@ public class RpcProgramNfs3 extends RpcP public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, InetAddress client) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -858,7 +875,7 @@ public class RpcProgramNfs3 extends RpcP public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, InetAddress client) { MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -954,7 +971,7 @@ public class RpcProgramNfs3 extends RpcP public REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, InetAddress client) { REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1029,7 +1046,7 @@ public class RpcProgramNfs3 extends RpcP public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, InetAddress client) { RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1111,7 +1128,7 @@ public class RpcProgramNfs3 extends RpcP public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, InetAddress client) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1205,7 +1222,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1293,7 +1310,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1430,7 +1447,7 @@ public class RpcProgramNfs3 extends RpcP return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); } @@ -1587,7 +1604,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1645,7 +1662,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1697,7 +1714,7 @@ public class RpcProgramNfs3 extends RpcP return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1738,7 +1755,7 @@ public class RpcProgramNfs3 extends RpcP public COMMIT3Response commit(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java?rev=1535589&r1=1535588&r2=1535589&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java Thu Oct 24 23:58:46 2013 @@ -39,12 +39,12 @@ public class TestDFSClientCache { DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); - DFSClient c1 = cache.get("test1"); - assertTrue(cache.get("test1").toString().contains("ugi=test1")); - assertEquals(c1, cache.get("test1")); + DFSClient c1 = cache.getDfsClient("test1"); + assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); + assertEquals(c1, cache.getDfsClient("test1")); assertFalse(isDfsClientClose(c1)); - cache.get("test2"); + cache.getDfsClient("test2"); assertTrue(isDfsClientClose(c1)); assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); } Modified: hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1535589&r1=1535588&r2=1535589&view=diff ============================================================================== --- hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Oct 24 23:58:46 2013 @@ -57,6 +57,9 @@ Release 2.2.1 - UNRELEASED HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers post HDFS-5306. (atm) + HDFS-5171. NFS should create input stream for a file and try to share it + with multiple read requests. (Haohui Mai via brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES