Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 003CF10229 for ; Thu, 13 Feb 2014 18:22:32 +0000 (UTC) Received: (qmail 9905 invoked by uid 500); 13 Feb 2014 18:22:26 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 9765 invoked by uid 500); 13 Feb 2014 18:22:19 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 9702 invoked by uid 99); 13 Feb 2014 18:22:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 18:22:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Feb 2014 18:22:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 80F812388AA9; Thu, 13 Feb 2014 18:21:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1567994 [3/3] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src... Date: Thu, 13 Feb 2014 18:21:48 -0000 To: hdfs-commits@hadoop.apache.org From: cnauroth@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140213182150.80F812388AA9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Thu Feb 13 18:21:46 2014 @@ -109,6 +109,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.sun.jersey.spi.container.ResourceFilters; @@ -162,9 +163,10 @@ public class NamenodeWebHdfsMethods { response.setContentType(null); } + @VisibleForTesting static DatanodeInfo chooseDatanode(final NameNode namenode, final String path, final HttpOpParam.Op op, final long openOffset, - final long blocksize, final Configuration conf) throws IOException { + final long blocksize) throws IOException { final BlockManager bm = namenode.getNamesystem().getBlockManager(); if (op == PutOpParam.Op.CREATE) { @@ -203,7 +205,7 @@ public class NamenodeWebHdfsMethods { final LocatedBlocks locations = np.getBlockLocations(path, offset, 1); final int count = locations.locatedBlockCount(); if (count > 0) { - return JspHelper.bestNode(locations.get(0).getLocations(), false, conf); + return bestNode(locations.get(0).getLocations()); } } } @@ -212,13 +214,26 @@ public class NamenodeWebHdfsMethods { ).chooseRandom(NodeBase.ROOT); } + /** + * Choose the datanode to redirect the request. Note that the nodes have been + * sorted based on availability and network distances, thus it is sufficient + * to return the first element of the node here. + */ + private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException { + if (nodes.length == 0 || nodes[0].isDecommissioned()) { + throw new IOException("No active nodes contain this block"); + } + return nodes[0]; + } + private Token generateDelegationToken( final NameNode namenode, final UserGroupInformation ugi, final String renewer) throws IOException { final Credentials c = DelegationTokenSecretManager.createCredentials( namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); final Token t = c.getAllTokens().iterator().next(); - Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND : SWebHdfsFileSystem.TOKEN_KIND; + Text kind = request.getScheme().equals("http") ? WebHdfsFileSystem.TOKEN_KIND + : SWebHdfsFileSystem.TOKEN_KIND; t.setKind(kind); return t; } @@ -229,9 +244,8 @@ public class NamenodeWebHdfsMethods { final String path, final HttpOpParam.Op op, final long openOffset, final long blocksize, final Param... parameters) throws URISyntaxException, IOException { - final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF); final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, - blocksize, conf); + blocksize); final String delegationQuery; if (!UserGroupInformation.isSecurityEnabled()) { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java Thu Feb 13 18:21:46 2014 @@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.FSImageUtil; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeReferenceSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory; @@ -79,6 +80,7 @@ final class LsrPBImage { private String[] stringTable; private HashMap inodes = Maps.newHashMap(); private HashMap dirmap = Maps.newHashMap(); + private ArrayList refList = Lists.newArrayList(); public LsrPBImage(Configuration conf, PrintWriter out) { this.conf = conf; @@ -125,6 +127,9 @@ final class LsrPBImage { case INODE: loadINodeSection(is); break; + case INODE_REFRENCE: + loadINodeReferenceSection(is); + break; case INODE_DIR: loadINodeDirectorySection(is); break; @@ -202,14 +207,26 @@ final class LsrPBImage { if (e == null) { break; } - long[] l = new long[e.getChildrenCount()]; - for (int i = 0; i < l.length; ++i) { + long[] l = new long[e.getChildrenCount() + e.getRefChildrenCount()]; + for (int i = 0; i < e.getChildrenCount(); ++i) { l[i] = e.getChildren(i); } + for (int i = e.getChildrenCount(); i < l.length; i++) { + int refId = e.getRefChildren(i - e.getChildrenCount()); + l[i] = refList.get(refId).getReferredId(); + } dirmap.put(e.getParent(), l); - for (int i = 0; i < e.getNumOfRef(); i++) { - INodeSection.INodeReference.parseDelimitedFrom(in); + } + } + + private void loadINodeReferenceSection(InputStream in) throws IOException { + while (true) { + INodeReferenceSection.INodeReference e = INodeReferenceSection + .INodeReference.parseDelimitedFrom(in); + if (e == null) { + break; } + refList.add(e); } } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java Thu Feb 13 18:21:46 2014 @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeSymlink; +import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeReferenceSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection; @@ -132,6 +133,9 @@ public final class PBImageXmlWriter { case INODE: dumpINodeSection(is); break; + case INODE_REFRENCE: + dumpINodeReferenceSection(is); + break; case INODE_DIR: dumpINodeDirectorySection(is); break; @@ -227,18 +231,27 @@ public final class PBImageXmlWriter { for (long id : e.getChildrenList()) { o("inode", id); } - for (int i = 0; i < e.getNumOfRef(); i++) { - INodeSection.INodeReference r = INodeSection.INodeReference - .parseDelimitedFrom(in); - dumpINodeReference(r); - + for (int refId : e.getRefChildrenList()) { + o("inodereference-index", refId); } out.print("\n"); } out.print("\n"); } - private void dumpINodeReference(INodeSection.INodeReference r) { + private void dumpINodeReferenceSection(InputStream in) throws IOException { + out.print(""); + while (true) { + INodeReferenceSection.INodeReference e = INodeReferenceSection + .INodeReference.parseDelimitedFrom(in); + if (e == null) { + break; + } + dumpINodeReference(e); + } + } + + private void dumpINodeReference(INodeReferenceSection.INodeReference r) { out.print(""); o("referredId", r.getReferredId()).o("name", r.getName().toStringUtf8()) .o("dstSnapshotId", r.getDstSnapshotId()) @@ -362,10 +375,15 @@ public final class PBImageXmlWriter { o("name", ce.getName().toStringUtf8()); out.print("\n"); } - for (int j = 0; j < d.getNumOfDeletedRef(); ++j) { - INodeSection.INodeReference r = INodeSection.INodeReference - .parseDelimitedFrom(in); - dumpINodeReference(r); + for (long did : d.getDeletedINodeList()) { + out.print(""); + o("inode", did); + out.print("\n"); + } + for (int dRefid : d.getDeletedINodeRefList()) { + out.print(""); + o("inodereference-index", dRefid); + out.print("\n"); } out.print("\n"); } Propchange: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/native/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1567238-1567993 Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto Thu Feb 13 18:21:46 2014 @@ -132,17 +132,6 @@ message INodeSection { optional bytes target = 2; } - message INodeReference { - // id of the referred inode - optional uint64 referredId = 1; - // local name recorded in WithName - optional bytes name = 2; - // recorded in DstReference - optional uint32 dstSnapshotId = 3; - // recorded in WithName - optional uint32 lastSnapshotId = 4; - } - message INode { enum Type { FILE = 1; @@ -183,13 +172,28 @@ message FilesUnderConstructionSection { message INodeDirectorySection { message DirEntry { optional uint64 parent = 1; + // children that are not reference nodes repeated uint64 children = 2 [packed = true]; - optional uint64 numOfRef = 3; - // repeated INodeReference... + // children that are reference nodes, each element is a reference node id + repeated uint32 refChildren = 3 [packed = true]; } // repeated DirEntry, ended at the boundary of the section. } +message INodeReferenceSection { + message INodeReference { + // id of the referred inode + optional uint64 referredId = 1; + // local name recorded in WithName + optional bytes name = 2; + // recorded in DstReference + optional uint32 dstSnapshotId = 3; + // recorded in WithName + optional uint32 lastSnapshotId = 4; + } + // repeated INodeReference... +} + /** * This section records the information about snapshot * NAME: SNAPSHOT @@ -224,10 +228,10 @@ message SnapshotDiffSection { optional bytes name = 4; optional INodeSection.INodeDirectory snapshotCopy = 5; optional uint32 createdListSize = 6; - optional uint32 numOfDeletedRef = 7; // number of reference nodes in deleted list - repeated uint64 deletedINode = 8 [packed = true]; // id of deleted inode + repeated uint64 deletedINode = 7 [packed = true]; // id of deleted inodes + // id of reference nodes in the deleted list + repeated uint32 deletedINodeRef = 8 [packed = true]; // repeated CreatedListEntry (size is specified by createdListSize) - // repeated INodeReference (reference inodes in deleted list) } message FileDiff { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Thu Feb 13 18:21:46 2014 @@ -162,6 +162,16 @@ + dfs.client.cached.conn.retry + 3 + The number of times the HDFS client will pull a socket from the + cache. Once this number is exceeded, the client will try to create a new + socket. + + + + + dfs.https.server.keystore.resource ssl-server.xml Resource file from which ssl server keystore @@ -1500,6 +1510,26 @@ + dfs.client.mmap.retry.timeout.ms + 300000 + + The minimum amount of time that we will wait before retrying a failed mmap + operation. + + + + + dfs.client.short.circuit.replica.stale.threshold.ms + 3000000 + + The maximum amount of time that we will consider a short-circuit replica to + be valid, if there is no communication from the DataNode. After this time + has elapsed, we will re-fetch the short-circuit replica even if it is in + the cache. + + + + dfs.namenode.path.based.cache.block.map.allocation.percent 0.25 @@ -1628,4 +1658,15 @@ + + dfs.client.context + default + + The name of the DFSClient context that we should use. Clients that share + a context share a socket cache and short-circuit cache, among other things. + You should only change this if you don't want to share with another set of + threads. + + + Propchange: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1567238-1567993 Propchange: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1567238-1567993 Propchange: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1567238-1567993 Propchange: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1567238-1567993 Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Thu Feb 13 18:21:46 2014 @@ -28,32 +28,39 @@ import java.util.EnumSet; import java.util.Random; import org.apache.commons.lang.SystemUtils; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.ClientContext; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.client.ClientMmap; -import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; +import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; +import java.util.Map; + import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -250,17 +257,39 @@ public class TestEnhancedByteBufferAcces } } - private static class CountingVisitor - implements ClientMmapManager.ClientMmapVisitor { - int count = 0; - - @Override - public void accept(ClientMmap mmap) { - count++; + private static class CountingVisitor implements CacheVisitor { + private final int expectedNumOutstandingMmaps; + private final int expectedNumReplicas; + private final int expectedNumEvictable; + private final int expectedNumMmapedEvictable; + + CountingVisitor(int expectedNumOutstandingMmaps, + int expectedNumReplicas, int expectedNumEvictable, + int expectedNumMmapedEvictable) { + this.expectedNumOutstandingMmaps = expectedNumOutstandingMmaps; + this.expectedNumReplicas = expectedNumReplicas; + this.expectedNumEvictable = expectedNumEvictable; + this.expectedNumMmapedEvictable = expectedNumMmapedEvictable; } - public void reset() { - count = 0; + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + if (expectedNumOutstandingMmaps >= 0) { + Assert.assertEquals(expectedNumOutstandingMmaps, numOutstandingMmaps); + } + if (expectedNumReplicas >= 0) { + Assert.assertEquals(expectedNumReplicas, replicas.size()); + } + if (expectedNumEvictable >= 0) { + Assert.assertEquals(expectedNumEvictable, evictable.size()); + } + if (expectedNumMmapedEvictable >= 0) { + Assert.assertEquals(expectedNumMmapedEvictable, evictableMmapped.size()); + } } } @@ -271,105 +300,98 @@ public class TestEnhancedByteBufferAcces final Path TEST_PATH = new Path("/a"); final int TEST_FILE_LENGTH = 16385; final int RANDOM_SEED = 23453; + final String CONTEXT = "testZeroCopyMmapCacheContext"; FSDataInputStream fsIn = null; - ByteBuffer results[] = { null, null, null, null, null }; - + ByteBuffer results[] = { null, null, null, null }; + DistributedFileSystem fs = null; + conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); - cluster.waitActive(); - fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - TEST_FILE_LENGTH, (short)1, RANDOM_SEED); - try { - DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); - } catch (InterruptedException e) { - Assert.fail("unexpected InterruptedException during " + - "waitReplication: " + e); - } catch (TimeoutException e) { - Assert.fail("unexpected TimeoutException during " + - "waitReplication: " + e); + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + final ShortCircuitCache cache = ClientContext.get( + CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); + cache.accept(new CountingVisitor(0, 5, 5, 0)); + results[0] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + fsIn.seek(0); + results[1] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + + // The mmap should be of the first block of the file. + final ExtendedBlock firstBlock = + DFSTestUtil.getFirstBlock(fs, TEST_PATH); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + ShortCircuitReplica replica = replicas.get( + new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); + Assert.assertNotNull(replica); + Assert.assertTrue(replica.hasMmap()); + // The replica should not yet be evictable, since we have it open. + Assert.assertNull(replica.getEvictableTimeNs()); } - fsIn = fs.open(TEST_PATH); - byte original[] = new byte[TEST_FILE_LENGTH]; - IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); - fsIn.close(); - fsIn = fs.open(TEST_PATH); - final ClientMmapManager mmapManager = fs.getClient().getMmapManager(); - final CountingVisitor countingVisitor = new CountingVisitor(); - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - mmapManager.visitEvictable(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - results[0] = fsIn.read(null, 4096, - EnumSet.of(ReadOption.SKIP_CHECKSUMS)); - fsIn.seek(0); - results[1] = fsIn.read(null, 4096, - EnumSet.of(ReadOption.SKIP_CHECKSUMS)); - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(1, countingVisitor.count); - countingVisitor.reset(); - mmapManager.visitEvictable(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - countingVisitor.reset(); - - // The mmaps should be of the first block of the file. - final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); - mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() { - @Override - public void accept(ClientMmap mmap) { - Assert.assertEquals(firstBlock, mmap.getBlock()); - } - }); + }); - // Read more blocks. - results[2] = fsIn.read(null, 4096, - EnumSet.of(ReadOption.SKIP_CHECKSUMS)); - results[3] = fsIn.read(null, 4096, - EnumSet.of(ReadOption.SKIP_CHECKSUMS)); - try { - results[4] = fsIn.read(null, 4096, - EnumSet.of(ReadOption.SKIP_CHECKSUMS)); - Assert.fail("expected UnsupportedOperationException"); - } catch (UnsupportedOperationException e) { - // expected + // Read more blocks. + results[2] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + results[3] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + + // we should have 3 mmaps, 1 evictable + cache.accept(new CountingVisitor(3, 5, 2, 0)); + + // After we close the cursors, the mmaps should be evictable for + // a brief period of time. Then, they should be closed (we're + // using a very quick timeout) + for (ByteBuffer buffer : results) { + if (buffer != null) { + fsIn.releaseBuffer(buffer); } - - // we should have 3 mmaps, 0 evictable - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(3, countingVisitor.count); - countingVisitor.reset(); - mmapManager.visitEvictable(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - - // After we close the cursors, the mmaps should be evictable for - // a brief period of time. Then, they should be closed (we're - // using a very quick timeout) - for (ByteBuffer buffer : results) { - if (buffer != null) { - fsIn.releaseBuffer(buffer); - } - } - GenericTestUtils.waitFor(new Supplier() { - public Boolean get() { - countingVisitor.reset(); - try { - mmapManager.visitEvictable(countingVisitor); - } catch (InterruptedException e) { - e.printStackTrace(); - return false; - } - return (0 == countingVisitor.count); - } - }, 10, 10000); - countingVisitor.reset(); - mmapManager.visitMmaps(countingVisitor); - Assert.assertEquals(0, countingVisitor.count); - } finally { - if (fsIn != null) fsIn.close(); - if (fs != null) fs.close(); - if (cluster != null) cluster.shutdown(); } + fsIn.close(); + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + final MutableBoolean finished = new MutableBoolean(false); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + finished.setValue(evictableMmapped.isEmpty()); + } + }); + return finished.booleanValue(); + } + }, 10, 60000); + + cache.accept(new CountingVisitor(0, -1, -1, -1)); + + fs.close(); + cluster.shutdown(); } /** Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestGlobPaths.java Thu Feb 13 18:21:46 2014 @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.UUID; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; @@ -1175,4 +1176,32 @@ public class TestGlobPaths { public void testReservedHdfsPathsOnFC() throws Exception { testOnFileContext(new TestReservedHdfsPaths()); } + + /** + * Test trying to glob the root. Regression test for HDFS-5888. + **/ + private static class TestGlobRoot implements FSTestWrapperGlobTest { + public void run(FSTestWrapper wrap, FSTestWrapper unprivilegedWrap, + FileSystem fs, FileContext fc) throws Exception { + final Path rootPath = new Path("/"); + FileStatus oldRootStatus = wrap.getFileStatus(rootPath); + String newOwner = UUID.randomUUID().toString(); + wrap.setOwner(new Path("/"), newOwner, null); + FileStatus[] status = + wrap.globStatus(rootPath, new AcceptAllPathFilter()); + Assert.assertEquals(1, status.length); + Assert.assertEquals(newOwner, status[0].getOwner()); + wrap.setOwner(new Path("/"), oldRootStatus.getOwner(), null); + } + } + + @Test + public void testGlobRootOnFS() throws Exception { + testOnFileSystem(new TestGlobRoot()); + } + + @Test + public void testGlobRootOnFC() throws Exception { + testOnFileContext(new TestGlobRoot()); + } } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Thu Feb 13 18:21:46 2014 @@ -28,8 +28,12 @@ import java.net.Socket; import java.util.List; import java.util.Random; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitReplica; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -38,6 +42,8 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.net.NetUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; /** * A helper class to setup the cluster, and get to BlockReader and DataNode for a block. @@ -141,22 +147,54 @@ public class BlockReaderTestUtil { */ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead) throws IOException { + return getBlockReader(cluster, testBlock, offset, lenToRead); + } + + /** + * Get a BlockReader for the given block. + */ + public static BlockReader getBlockReader(MiniDFSCluster cluster, + LocatedBlock testBlock, int offset, int lenToRead) throws IOException { InetSocketAddress targetAddr = null; - Socket sock = null; ExtendedBlock block = testBlock.getBlock(); DatanodeInfo[] nodes = testBlock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); - sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); - sock.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); - sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - - return BlockReaderFactory.newBlockReader( - new DFSClient.Conf(conf), - targetAddr.toString()+ ":" + block.getBlockId(), block, - testBlock.getBlockToken(), - offset, lenToRead, - true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock), - nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy()); + + final DistributedFileSystem fs = cluster.getFileSystem(); + return new BlockReaderFactory(fs.getClient().getConf()). + setInetSocketAddress(targetAddr). + setBlock(block). + setFileName(targetAddr.toString()+ ":" + block.getBlockId()). + setBlockToken(testBlock.getBlockToken()). + setStartOffset(offset). + setLength(lenToRead). + setVerifyChecksum(true). + setClientName("BlockReaderTestUtil"). + setDatanodeInfo(nodes[0]). + setClientCacheContext(ClientContext.getFromConf(fs.getConf())). + setCachingStrategy(CachingStrategy.newDefaultStrategy()). + setConfiguration(fs.getConf()). + setAllowShortCircuitLocalReads(true). + setRemotePeerFactory(new RemotePeerFactory() { + @Override + public Peer newConnectedPeer(InetSocketAddress addr) + throws IOException { + Peer peer = null; + Socket sock = NetUtils. + getDefaultSocketFactory(fs.getConf()).createSocket(); + try { + sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); + sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); + peer = TcpPeerServer.peerFromSocket(sock); + } finally { + if (peer == null) { + IOUtils.closeQuietly(sock); + } + } + return peer; + } + }). + build(); } /** @@ -167,4 +205,13 @@ public class BlockReaderTestUtil { int ipcport = nodes[0].getIpcPort(); return cluster.getDataNode(ipcport); } -} + + public static void enableBlockReaderFactoryTracing() { + LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitCache.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel( + Level.TRACE); + } +} \ No newline at end of file Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Feb 13 18:21:46 2014 @@ -187,10 +187,26 @@ public class DFSTestUtil { } } - public static String readFile(FileSystem fs, Path fileName) throws IOException { + public static String readFile(FileSystem fs, Path fileName) + throws IOException { + byte buf[] = readFileBuffer(fs, fileName); + return new String(buf, 0, buf.length); + } + + public static byte[] readFileBuffer(FileSystem fs, Path fileName) + throws IOException { ByteArrayOutputStream os = new ByteArrayOutputStream(); - IOUtils.copyBytes(fs.open(fileName), os, 1024, true); - return os.toString(); + try { + FSDataInputStream in = fs.open(fileName); + try { + IOUtils.copyBytes(fs.open(fileName), os, 1024, true); + return os.toByteArray(); + } finally { + in.close(); + } + } finally { + os.close(); + } } public static void createFile(FileSystem fs, Path fileName, long fileLen, @@ -232,6 +248,13 @@ public class DFSTestUtil { } } + public static byte[] calculateFileContentsFromSeed(long seed, int length) { + Random rb = new Random(seed); + byte val[] = new byte[length]; + rb.nextBytes(val); + return val; + } + /** check if the files have been copied correctly. */ public boolean checkFiles(FileSystem fs, String topdir) throws IOException { Path root = new Path(topdir); @@ -551,8 +574,12 @@ public class DFSTestUtil { public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException { HdfsDataInputStream in = (HdfsDataInputStream) fs.open(path); - in.readByte(); - return in.getCurrentBlock(); + try { + in.readByte(); + return in.getCurrentBlock(); + } finally { + in.close(); + } } public static List getAllBlocks(FSDataInputStream in) Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Thu Feb 13 18:21:46 2014 @@ -30,13 +30,15 @@ import org.apache.hadoop.fs.FSDataInputS import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.util.Time; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; @@ -155,6 +157,8 @@ public class TestBlockReaderLocal { File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block); DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId(); + ShortCircuitCache shortCircuitCache = + ClientContext.getFromConf(conf).getShortCircuitCache(); cluster.shutdown(); cluster = null; test.setup(dataFile, checksum); @@ -164,16 +168,17 @@ public class TestBlockReaderLocal { }; dataIn = streams[0]; metaIn = streams[1]; + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + ShortCircuitReplica replica = new ShortCircuitReplica( + key, dataIn, metaIn, shortCircuitCache, Time.now()); blockReaderLocal = new BlockReaderLocal.Builder( new DFSClient.Conf(conf)). setFilename(TEST_PATH.getName()). setBlock(block). - setStreams(streams). + setShortCircuitReplica(replica). setDatanodeID(datanodeID). setCachingStrategy(new CachingStrategy(false, readahead)). setVerifyChecksum(checksum). - setBlockMetadataHeader(BlockMetadataHeader.preadHeader( - metaIn.getChannel())). build(); dataIn = null; metaIn = null; Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Thu Feb 13 18:21:46 2014 @@ -25,18 +25,8 @@ import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.security.token.Token; import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; /** * This class tests the client connection caching in a single node @@ -49,30 +39,6 @@ public class TestConnCache { static final int FILE_SIZE = 3 * BLOCK_SIZE; /** - * A mock Answer to remember the BlockReader used. - * - * It verifies that all invocation to DFSInputStream.getBlockReader() - * use the same peer. - */ - private class MockGetBlockReader implements Answer { - public RemoteBlockReader2 reader = null; - private Peer peer = null; - - @Override - public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { - RemoteBlockReader2 prevReader = reader; - reader = (RemoteBlockReader2) invocation.callRealMethod(); - if (peer == null) { - peer = reader.getPeer(); - } else if (prevReader != null) { - Assert.assertSame("DFSInputStream should use the same peer", - peer, reader.getPeer()); - } - return reader; - } - } - - /** * (Optionally) seek to position, read and verify data. * * Seek to specified position if pos is non-negative. @@ -115,33 +81,29 @@ public class TestConnCache { * @throws Exception */ @Test - @SuppressWarnings("unchecked") public void testReadFromOneDN() throws Exception { - BlockReaderTestUtil util = new BlockReaderTestUtil(1, - new HdfsConfiguration()); + HdfsConfiguration configuration = new HdfsConfiguration(); + // One of the goals of this test is to verify that we don't open more + // than one socket. So use a different client context, so that we + // get our own socket cache, rather than sharing with the other test + // instances. Also use a really long socket timeout so that nothing + // gets closed before we get around to checking the cache size at the end. + final String contextName = "testReadFromOneDNContext"; + configuration.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, contextName); + configuration.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, + 100000000L); + BlockReaderTestUtil util = new BlockReaderTestUtil(1, configuration); final Path testFile = new Path("/testConnCache.dat"); byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024); DFSClient client = new DFSClient( new InetSocketAddress("localhost", util.getCluster().getNameNodePort()), util.getConf()); - DFSInputStream in = Mockito.spy(client.open(testFile.toString())); + ClientContext cacheContext = + ClientContext.get(contextName, client.getConf()); + DFSInputStream in = client.open(testFile.toString()); LOG.info("opened " + testFile.toString()); byte[] dataBuf = new byte[BLOCK_SIZE]; - MockGetBlockReader answer = new MockGetBlockReader(); - Mockito.doAnswer(answer).when(in).getBlockReader( - (InetSocketAddress) Matchers.anyObject(), - (DatanodeInfo) Matchers.anyObject(), - Matchers.anyString(), - (ExtendedBlock) Matchers.anyObject(), - (Token) Matchers.anyObject(), - Matchers.anyLong(), - Matchers.anyLong(), - Matchers.anyInt(), - Matchers.anyBoolean(), - Matchers.anyString(), - (CachingStrategy)Matchers.anyObject()); - // Initial read pread(in, 0, dataBuf, 0, dataBuf.length, authenticData); // Read again and verify that the socket is the same @@ -153,5 +115,8 @@ public class TestConnCache { pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData); in.close(); + client.close(); + Assert.assertEquals(1, + ClientContext.getFromConf(configuration).getPeerCache().size()); } } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Thu Feb 13 18:21:46 2014 @@ -22,7 +22,7 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -86,21 +86,22 @@ public class TestDataTransferKeepalive { // the datanode-side expiration time. final long CLIENT_EXPIRY_MS = 60000L; clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS); - PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS); + clientConf.set(DFS_CLIENT_CONTEXT, "testDatanodeRespectsKeepAliveTimeout"); DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(), clientConf); + PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache(); DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); // Clients that write aren't currently re-used. - assertEquals(0, fs.dfs.peerCache.size()); + assertEquals(0, peerCache.size()); assertXceiverCount(0); // Reads the file, so we should get a // cached socket, and should have an xceiver on the other side. DFSTestUtil.readFile(fs, TEST_FILE); - assertEquals(1, fs.dfs.peerCache.size()); + assertEquals(1, peerCache.size()); assertXceiverCount(1); // Sleep for a bit longer than the keepalive timeout @@ -111,15 +112,13 @@ public class TestDataTransferKeepalive { // The socket is still in the cache, because we don't // notice that it's closed until we try to read // from it again. - assertEquals(1, fs.dfs.peerCache.size()); + assertEquals(1, peerCache.size()); // Take it out of the cache - reading should // give an EOF. - Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false); + Peer peer = peerCache.get(dn.getDatanodeId(), false); assertNotNull(peer); assertEquals(-1, peer.getInputStream().read()); - PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, - DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); } /** @@ -132,34 +131,33 @@ public class TestDataTransferKeepalive { // the datanode-side expiration time. final long CLIENT_EXPIRY_MS = 10L; clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS); - PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS); + clientConf.set(DFS_CLIENT_CONTEXT, "testClientResponsesKeepAliveTimeout"); DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(), clientConf); + PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache(); DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); // Clients that write aren't currently re-used. - assertEquals(0, fs.dfs.peerCache.size()); + assertEquals(0, peerCache.size()); assertXceiverCount(0); // Reads the file, so we should get a // cached socket, and should have an xceiver on the other side. DFSTestUtil.readFile(fs, TEST_FILE); - assertEquals(1, fs.dfs.peerCache.size()); + assertEquals(1, peerCache.size()); assertXceiverCount(1); // Sleep for a bit longer than the client keepalive timeout. Thread.sleep(CLIENT_EXPIRY_MS + 1); // Taking out a peer which is expired should give a null. - Peer peer = fs.dfs.peerCache.get(dn.getDatanodeId(), false); + Peer peer = peerCache.get(dn.getDatanodeId(), false); assertTrue(peer == null); // The socket cache is now empty. - assertEquals(0, fs.dfs.peerCache.size()); - PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, - DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); + assertEquals(0, peerCache.size()); } /** @@ -174,7 +172,7 @@ public class TestDataTransferKeepalive { final long CLIENT_EXPIRY_MS = 600000L; Configuration clientConf = new Configuration(conf); clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, CLIENT_EXPIRY_MS); - PeerCache.setInstance(DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT, CLIENT_EXPIRY_MS); + clientConf.set(DFS_CLIENT_CONTEXT, "testSlowReader"); DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(), clientConf); @@ -209,7 +207,12 @@ public class TestDataTransferKeepalive { @Test(timeout=30000) public void testManyClosedSocketsInCache() throws Exception { // Make a small file - DistributedFileSystem fs = cluster.getFileSystem(); + Configuration clientConf = new Configuration(conf); + clientConf.set(DFS_CLIENT_CONTEXT, "testManyClosedSocketsInCache"); + DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(), + clientConf); + PeerCache peerCache = ClientContext.getFromConf(clientConf).getPeerCache(); DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); // Insert a bunch of dead sockets in the cache, by opening @@ -227,15 +230,14 @@ public class TestDataTransferKeepalive { IOUtils.cleanup(null, stms); } - DFSClient client = ((DistributedFileSystem)fs).dfs; - assertEquals(5, client.peerCache.size()); + assertEquals(5, peerCache.size()); // Let all the xceivers timeout Thread.sleep(1500); assertXceiverCount(0); // Client side still has the sockets cached - assertEquals(5, client.peerCache.size()); + assertEquals(5, peerCache.size()); // Reading should not throw an exception. DFSTestUtil.readFile(fs, TEST_FILE); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java Thu Feb 13 18:21:46 2014 @@ -53,7 +53,8 @@ public class TestDisableConnCache { FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf()); try { DFSTestUtil.readFile(fsWithoutCache, testFile); - assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size()); + assertEquals(0, ((DistributedFileSystem)fsWithoutCache). + dfs.getClientContext().getPeerCache().size()); } finally { fsWithoutCache.close(); util.shutdown(); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java Thu Feb 13 18:21:46 2014 @@ -303,5 +303,6 @@ public class TestFileStatus { FileSystem.LOG.info("GOOD: getting an exception", ioe); } } + fs.delete(dir, true); } } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Thu Feb 13 18:21:46 2014 @@ -27,6 +27,7 @@ import java.io.RandomAccessFile; import java.net.URI; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; +import java.util.UUID; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -35,8 +36,9 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; @@ -125,8 +127,9 @@ public class TestShortCircuitLocalRead { throws IOException, InterruptedException { // Ensure short circuit is enabled DistributedFileSystem fs = getFileSystem(readingUser, uri, conf); + ClientContext getClientContext = ClientContext.getFromConf(conf); if (legacyShortCircuitFails) { - assertTrue(fs.getClient().useLegacyBlockReaderLocal()); + assertFalse(getClientContext.getDisableLegacyBlockReaderLocal()); } FSDataInputStream stm = fs.open(name); @@ -155,7 +158,7 @@ public class TestShortCircuitLocalRead { checkData(actual, readOffset, expected, "Read 3"); if (legacyShortCircuitFails) { - assertFalse(fs.getClient().useLegacyBlockReaderLocal()); + assertTrue(getClientContext.getDisableLegacyBlockReaderLocal()); } stm.close(); } @@ -175,8 +178,9 @@ public class TestShortCircuitLocalRead { throws IOException, InterruptedException { // Ensure short circuit is enabled DistributedFileSystem fs = getFileSystem(readingUser, uri, conf); + ClientContext clientContext = ClientContext.getFromConf(conf); if (legacyShortCircuitFails) { - assertTrue(fs.getClient().useLegacyBlockReaderLocal()); + assertTrue(clientContext.getDisableLegacyBlockReaderLocal()); } HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name); @@ -209,7 +213,7 @@ public class TestShortCircuitLocalRead { } checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3"); if (legacyShortCircuitFails) { - assertFalse(fs.getClient().useLegacyBlockReaderLocal()); + assertTrue(clientContext.getDisableLegacyBlockReaderLocal()); } stm.close(); } @@ -223,7 +227,6 @@ public class TestShortCircuitLocalRead { public void doTestShortCircuitRead(boolean ignoreChecksum, int size, int readOffset) throws IOException, InterruptedException { - String shortCircuitUser = getCurrentUser(); doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset, null, getCurrentUser(), false); } @@ -239,6 +242,10 @@ public class TestShortCircuitLocalRead { conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, ignoreChecksum); + // Set a random client context name so that we don't share a cache with + // other invocations of this function. + conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, + UUID.randomUUID().toString()); conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath()); @@ -322,18 +329,6 @@ public class TestShortCircuitLocalRead { doTestShortCircuitRead(true, 10*blockSize+100, 777); } - private ClientDatanodeProtocol getProxy(UserGroupInformation ugi, - final DatanodeID dnInfo, final Configuration conf) throws IOException, - InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public ClientDatanodeProtocol run() throws Exception { - return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, - false); - } - }); - } - private static DistributedFileSystem getFileSystem(String user, final URI uri, final Configuration conf) throws InterruptedException, IOException { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); @@ -555,8 +550,7 @@ public class TestShortCircuitLocalRead { for (int i = 0; i < iteration; i++) { try { String user = getCurrentUser(); - checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, - true); + checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { @@ -608,7 +602,8 @@ public class TestShortCircuitLocalRead { stm.write(fileData); stm.close(); try { - checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, shortCircuitFails); + checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, + conf, shortCircuitFails); //RemoteBlockReader have unsupported method read(ByteBuffer bf) assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error", checkUnsupportedMethod(fs, file1, fileData, readOffset)); Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Thu Feb 13 18:21:46 2014 @@ -38,10 +38,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.RemotePeerFactory; +import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitReplica; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -55,10 +61,13 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; +import org.junit.Assert; import org.junit.Test; public class TestBlockTokenWithDFS { @@ -131,50 +140,70 @@ public class TestBlockTokenWithDFS { } // try reading a block using a BlockReader directly - private static void tryRead(Configuration conf, LocatedBlock lblock, + private static void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; - Socket s = null; + IOException ioe = null; BlockReader blockReader = null; ExtendedBlock block = lblock.getBlock(); try { DatanodeInfo[] nodes = lblock.getLocations(); targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); - s = NetUtils.getDefaultSocketFactory(conf).createSocket(); - s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); - s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - - String file = BlockReaderFactory.getFileName(targetAddr, - "test-blockpoolid", block.getBlockId()); - blockReader = BlockReaderFactory.newBlockReader( - new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1, - true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s), - nodes[0], null, null, null, false, - CachingStrategy.newDefaultStrategy()); + blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). + setFileName(BlockReaderFactory.getFileName(targetAddr, + "test-blockpoolid", block.getBlockId())). + setBlock(block). + setBlockToken(lblock.getBlockToken()). + setInetSocketAddress(targetAddr). + setStartOffset(0). + setLength(-1). + setVerifyChecksum(true). + setClientName("TestBlockTokenWithDFS"). + setDatanodeInfo(nodes[0]). + setCachingStrategy(CachingStrategy.newDefaultStrategy()). + setClientCacheContext(ClientContext.getFromConf(conf)). + setConfiguration(conf). + setRemotePeerFactory(new RemotePeerFactory() { + @Override + public Peer newConnectedPeer(InetSocketAddress addr) + throws IOException { + Peer peer = null; + Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + try { + sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); + sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); + peer = TcpPeerServer.peerFromSocket(sock); + } finally { + if (peer == null) { + IOUtils.closeSocket(sock); + } + } + return peer; + } + }). + build(); } catch (IOException ex) { - if (ex instanceof InvalidBlockTokenException) { - assertFalse("OP_READ_BLOCK: access token is invalid, " - + "when it is expected to be valid", shouldSucceed); - return; - } - fail("OP_READ_BLOCK failed due to reasons other than access token: " - + StringUtils.stringifyException(ex)); + ioe = ex; } finally { - if (s != null) { + if (blockReader != null) { try { - s.close(); - } catch (IOException iex) { - } finally { - s = null; + blockReader.close(); + } catch (IOException e) { + throw new RuntimeException(e); } } } - if (blockReader == null) { - fail("OP_READ_BLOCK failed due to reasons other than access token"); + if (shouldSucceed) { + Assert.assertNotNull("OP_READ_BLOCK: access token is invalid, " + + "when it is expected to be valid", blockReader); + } else { + Assert.assertNotNull("OP_READ_BLOCK: access token is valid, " + + "when it is expected to be invalid", ioe); + Assert.assertTrue( + "OP_READ_BLOCK failed due to reasons other than access token: ", + ioe instanceof InvalidBlockTokenException); } - assertTrue("OP_READ_BLOCK: access token is valid, " - + "when it is expected to be invalid", shouldSucceed); } // get a conf for testing @@ -347,9 +376,13 @@ public class TestBlockTokenWithDFS { /* * testing READ interface on DN using a BlockReader */ - - new DFSClient(new InetSocketAddress("localhost", + DFSClient client = null; + try { + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); + } finally { + if (client != null) client.close(); + } List locatedBlocks = nnProto.getBlockLocations( FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); LocatedBlock lblock = locatedBlocks.get(0); // first block Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Thu Feb 13 18:21:46 2014 @@ -35,11 +35,14 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; import org.apache.hadoop.hdfs.BlockReaderFactory; +import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.RemotePeerFactory; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -48,13 +51,14 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -284,23 +288,43 @@ public class TestDataNodeVolumeFailure { private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock) throws IOException { InetSocketAddress targetAddr = null; - Socket s = null; ExtendedBlock block = lblock.getBlock(); targetAddr = NetUtils.createSocketAddr(datanode.getXferAddr()); - - s = NetUtils.getDefaultSocketFactory(conf).createSocket(); - s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT); - s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); - - String file = BlockReaderFactory.getFileName(targetAddr, - "test-blockpoolid", - block.getBlockId()); - BlockReader blockReader = - BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block, - lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure", - TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false, - CachingStrategy.newDefaultStrategy()); + + BlockReader blockReader = new BlockReaderFactory(new DFSClient.Conf(conf)). + setInetSocketAddress(targetAddr). + setBlock(block). + setFileName(BlockReaderFactory.getFileName(targetAddr, + "test-blockpoolid", block.getBlockId())). + setBlockToken(lblock.getBlockToken()). + setStartOffset(0). + setLength(-1). + setVerifyChecksum(true). + setClientName("TestDataNodeVolumeFailure"). + setDatanodeInfo(datanode). + setCachingStrategy(CachingStrategy.newDefaultStrategy()). + setClientCacheContext(ClientContext.getFromConf(conf)). + setConfiguration(conf). + setRemotePeerFactory(new RemotePeerFactory() { + @Override + public Peer newConnectedPeer(InetSocketAddress addr) + throws IOException { + Peer peer = null; + Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); + try { + sock.connect(addr, HdfsServerConstants.READ_TIMEOUT); + sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); + peer = TcpPeerServer.peerFromSocket(sock); + } finally { + if (peer == null) { + IOUtils.closeSocket(sock); + } + } + return peer; + } + }). + build(); blockReader.close(); } Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java Thu Feb 13 18:21:46 2014 @@ -92,7 +92,7 @@ public class TestWebHdfsDataLocality { //The chosen datanode must be the same as the client address final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, conf); + namenode, f, PutOpParam.Op.CREATE, -1L, blocksize); Assert.assertEquals(ipAddr, chosen.getIpAddr()); } } @@ -117,19 +117,19 @@ public class TestWebHdfsDataLocality { { //test GETFILECHECKSUM final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, conf); + namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize); Assert.assertEquals(expected, chosen); } { //test OPEN final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, GetOpParam.Op.OPEN, 0, blocksize, conf); + namenode, f, GetOpParam.Op.OPEN, 0, blocksize); Assert.assertEquals(expected, chosen); } { //test APPEND final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( - namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, conf); + namenode, f, PostOpParam.Op.APPEND, -1L, blocksize); Assert.assertEquals(expected, chosen); } } finally { Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java Thu Feb 13 18:21:46 2014 @@ -136,6 +136,7 @@ public class TestHftpFileSystem { out.close(); FSDataInputStream in = hftpFs.open(p); assertEquals('0', in.read()); + in.close(); // Check the file status matches the path. Hftp returns a FileStatus // with the entire URI, extract the path part. @@ -250,6 +251,7 @@ public class TestHftpFileSystem { FSDataInputStream in = hftpFs.open(testFile); in.seek(7); assertEquals('7', in.read()); + in.close(); } @Test Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHttpsFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHttpsFileSystem.java?rev=1567994&r1=1567993&r2=1567994&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHttpsFileSystem.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHttpsFileSystem.java Thu Feb 13 18:21:46 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.web; import java.io.File; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.URI; @@ -92,6 +93,9 @@ public class TestHttpsFileSystem { os.write(23); os.close(); Assert.assertTrue(fs.exists(f)); + InputStream is = fs.open(f); + Assert.assertEquals(23, is.read()); + is.close(); fs.close(); } }