Author: bobby Date: Fri Sep 28 16:44:20 2012 New Revision: 1391542 URL: http://svn.apache.org/viewvc?rev=1391542&view=rev Log: HDFS-3373. FileContext HDFS implementation can leak socket caches (John George via bobby) Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1391542&r1=1391541&r2=1391542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Sep 28 16:44:20 2012 @@ -24,6 +24,9 @@ Release 0.23.4 - UNRELEASED HDFS-3731. Release upgrade must handle blocks being written from 1.0 (Kihwal Lee via daryn) + HDFS-3373. FileContext HDFS implementation can leak socket caches (John + George via bobby) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1391542&r1=1391541&r2=1391542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 28 16:44:20 2012 @@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT; @@ -178,6 +180,7 @@ public class DFSClient implements java.i final int writePacketSize; final int socketTimeout; final int socketCacheCapacity; + final long socketCacheExpiry; /** Wait time window (in msec) if BlockMissingException is caught */ final int timeWindow; final int nCachedConnRetry; @@ -212,6 +215,8 @@ public class DFSClient implements java.i taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE"); socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT); + socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, + DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT); prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 10 * defaultBlockSize); timeWindow = conf @@ -336,7 +341,7 @@ public class DFSClient implements java.i nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort(); this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId(); - this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); + this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry); if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = DFSUtil.createRPCNamenode(nameNodeAddr, conf, ugi); this.namenode = DFSUtil.createNamenode(this.rpcNamenode); @@ -506,7 +511,6 @@ public class DFSClient implements java.i void abort() { clientRunning = false; closeAllFilesBeingWritten(true); - socketCache.clear(); try { // remove reference to this client and stop the renewer, // if there is no more clients under the renewer. @@ -551,7 +555,6 @@ public class DFSClient implements java.i public synchronized void close() throws IOException { if(clientRunning) { closeAllFilesBeingWritten(false); - socketCache.clear(); clientRunning = false; getLeaseRenewer().closeClient(this); // close connections to the namenode Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1391542&r1=1391541&r2=1391542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Sep 28 16:44:20 2012 @@ -49,6 +49,8 @@ public class DFSConfigKeys extends Commo public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; + public static final String DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec"; + public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000; public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address"; public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100"; public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address"; Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1391542&r1=1391541&r2=1391542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Fri Sep 28 16:44:20 2012 @@ -25,28 +25,103 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.io.IOException; import com.google.common.base.Preconditions; import com.google.common.collect.LinkedListMultimap; import org.apache.commons.logging.Log; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; /** - * A cache of sockets. + * A cache of input stream sockets to Data Node. */ class SocketCache { - static final Log LOG = LogFactory.getLog(SocketCache.class); + private static final Log LOG = LogFactory.getLog(SocketCache.class); + private Daemon daemon; + /** A map for per user per datanode. */ + private static LinkedListMultimap multimap = + LinkedListMultimap.create(); + private static int capacity; + private static long expiryPeriod; + private static SocketCache scInstance = new SocketCache(); - private final LinkedListMultimap multimap; - private final int capacity; + private static class SocketProp { + Socket s; + long createTime; + public SocketProp(Socket s) + { + this.s=s; + this.createTime = System.currentTimeMillis(); + } - /** - * Create a SocketCache with the given capacity. - * @param capacity Max cache size. - */ - public SocketCache(int capacity) { - multimap = LinkedListMultimap.create(); - this.capacity = capacity; + public long getCreateTime() { + return this.createTime; + } + + public Socket getSocket() { + return this.s; + } + } + + // capacity and expiryPeriod are only initialized once. + private static boolean isInitedOnce() { + if (capacity == 0 || expiryPeriod == 0) { + return false; + } + return true; + } + + public static synchronized SocketCache getInstance(int c, long e) { + + if (c == 0 || e == 0) { + throw new IllegalStateException("Cannot initialize ZERO capacity " + + "or expiryPeriod"); + } + + // capacity is only initialzied once + if (isInitedOnce() == false) { + capacity = c; + expiryPeriod = e; + } else if (capacity != c || expiryPeriod != e) { + LOG.info("capacity and expiry periods already set to " + capacity + + " and " + expiryPeriod + " respectively. Cannot set it to " + c + + " and " + e); + } + + return scInstance; + } + + private boolean isDaemonStarted() { + return (daemon == null)? false: true; + } + + private synchronized void startExpiryDaemon() { + // start daemon only if not already started + if (isDaemonStarted() == true) { + return; + } + + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + SocketCache.this.run(); + } catch(InterruptedException e) { + //noop + } finally { + SocketCache.this.clear(); + } + } + + @Override + public String toString() { + return String.valueOf(SocketCache.this); + } + }); + daemon.start(); } /** @@ -55,14 +130,14 @@ class SocketCache { * @return A socket with unknown state, possibly closed underneath. Or null. */ public synchronized Socket get(SocketAddress remote) { - List socklist = multimap.get(remote); - if (socklist == null) { + List sockPropList = multimap.get(remote); + if (sockPropList == null) { return null; } - Iterator iter = socklist.iterator(); + Iterator iter = sockPropList.iterator(); while (iter.hasNext()) { - Socket candidate = iter.next(); + Socket candidate = iter.next().getSocket(); iter.remove(); if (!candidate.isClosed()) { return candidate; @@ -76,7 +151,9 @@ class SocketCache { * @param sock socket not used by anyone. */ public synchronized void put(Socket sock) { + Preconditions.checkNotNull(sock); + startExpiryDaemon(); SocketAddress remoteAddr = sock.getRemoteSocketAddress(); if (remoteAddr == null) { @@ -89,7 +166,7 @@ class SocketCache { if (capacity == multimap.size()) { evictOldest(); } - multimap.put(remoteAddr, sock); + multimap.put(remoteAddr, new SocketProp (sock)); } public synchronized int size() { @@ -97,32 +174,67 @@ class SocketCache { } /** + * Evict and close sockets older than expiry period from the cache. + */ + private synchronized void evictExpired(long expiryPeriod) { + while (multimap.size() != 0) { + Iterator> iter = + multimap.entries().iterator(); + Entry entry = iter.next(); + + // if oldest socket expired, remove it + if (entry == null || + System.currentTimeMillis() - entry.getValue().getCreateTime() < + expiryPeriod) { + break; + } + iter.remove(); + Socket sock = entry.getValue().getSocket(); + IOUtils.closeSocket(sock); + } + } + + /** * Evict the oldest entry in the cache. */ private synchronized void evictOldest() { - Iterator> iter = + Iterator> iter = multimap.entries().iterator(); if (!iter.hasNext()) { throw new IllegalStateException("Cannot evict from empty cache!"); } - Entry entry = iter.next(); + Entry entry = iter.next(); iter.remove(); - Socket sock = entry.getValue(); + Socket sock = entry.getValue().getSocket(); IOUtils.closeSocket(sock); } /** - * Empty the cache, and close all sockets. + * Periodically check in the cache and expire the entries + * older than expiryPeriod minutes */ - public synchronized void clear() { - for (Socket sock : multimap.values()) { - IOUtils.closeSocket(sock); + private void run() throws InterruptedException { + for(long lastExpiryTime = System.currentTimeMillis(); + !Thread.interrupted(); + Thread.sleep(expiryPeriod)) { + final long elapsed = System.currentTimeMillis() - lastExpiryTime; + if (elapsed >= expiryPeriod) { + evictExpired(expiryPeriod); + lastExpiryTime = System.currentTimeMillis(); + } } - multimap.clear(); + clear(); + throw new InterruptedException("Daemon Interrupted"); } - protected void finalize() { - clear(); + /** + * Empty the cache, and close all sockets. + */ + private synchronized void clear() { + for (SocketProp sockProp : multimap.values()) { + IOUtils.closeSocket(sockProp.getSocket()); + } + multimap.clear(); } } Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1391542&r1=1391541&r2=1391542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Fri Sep 28 16:44:20 2012 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.net.InetSocketAddress; import java.net.Socket; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,10 +57,12 @@ public class TestConnCache { static final int BLOCK_SIZE = 4096; static final int FILE_SIZE = 3 * BLOCK_SIZE; - + final static int CACHE_SIZE = 4; + final static long CACHE_EXPIRY_MS = 200; static Configuration conf = null; static MiniDFSCluster cluster = null; static FileSystem fs = null; + static SocketCache cache; static final Path testFile = new Path("/testConnCache.dat"); static byte authenticData[] = null; @@ -94,6 +97,9 @@ public class TestConnCache { public static void setupCluster() throws Exception { final int REPLICATION_FACTOR = 1; + /* create a socket cache. There is only one socket cache per jvm */ + cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS); + util = new BlockReaderTestUtil(REPLICATION_FACTOR); cluster = util.getCluster(); conf = util.getConf(); @@ -143,10 +149,7 @@ public class TestConnCache { * Test the SocketCache itself. */ @Test - public void testSocketCache() throws IOException { - final int CACHE_SIZE = 4; - SocketCache cache = new SocketCache(CACHE_SIZE); - + public void testSocketCache() throws Exception { // Make a client InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort()); @@ -160,6 +163,7 @@ public class TestConnCache { DataNode dn = util.getDataNode(block); InetSocketAddress dnAddr = dn.getSelfAddr(); + // Make some sockets to the DN Socket[] dnSockets = new Socket[CACHE_SIZE]; for (int i = 0; i < dnSockets.length; ++i) { @@ -167,6 +171,7 @@ public class TestConnCache { dnAddr.getAddress(), dnAddr.getPort()); } + // Insert a socket to the NN Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort()); cache.put(nnSock); @@ -180,7 +185,7 @@ public class TestConnCache { assertEquals("NN socket evicted", null, cache.get(nnAddr)); assertTrue("Evicted socket closed", nnSock.isClosed()); - + // Lookup the DN socks for (Socket dnSock : dnSockets) { assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr)); @@ -190,6 +195,51 @@ public class TestConnCache { assertEquals("Cache is empty", 0, cache.size()); } + + /** + * Test the SocketCache expiry. + * Verify that socket cache entries expire after the set + * expiry time. + */ + @Test + public void testSocketCacheExpiry() throws Exception { + // Make a client + InetSocketAddress nnAddr = + new InetSocketAddress("localhost", cluster.getNameNodePort()); + DFSClient client = new DFSClient(nnAddr, conf); + + // Find out the DN addr + LocatedBlock block = + client.getNamenode().getBlockLocations( + testFile.toString(), 0, FILE_SIZE) + .getLocatedBlocks().get(0); + DataNode dn = util.getDataNode(block); + InetSocketAddress dnAddr = dn.getSelfAddr(); + + + // Make some sockets to the DN and put in cache + Socket[] dnSockets = new Socket[CACHE_SIZE]; + for (int i = 0; i < dnSockets.length; ++i) { + dnSockets[i] = client.socketFactory.createSocket( + dnAddr.getAddress(), dnAddr.getPort()); + cache.put(dnSockets[i]); + } + + // Client side still has the sockets cached + assertEquals(CACHE_SIZE, client.socketCache.size()); + + //sleep for a second and see if it expired + Thread.sleep(CACHE_EXPIRY_MS + 1000); + + // Client side has no sockets cached + assertEquals(0, client.socketCache.size()); + + //hang in for a second to ensure the thread + // does well even when cache is empty + Thread.sleep(CACHE_EXPIRY_MS + 1000); + } + + /** * Read a file served entirely from one DN. Seek around and read from * different offsets. And verify that they all use the same socket. Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1391542&r1=1391541&r2=1391542&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Sep 28 16:44:20 2012 @@ -89,7 +89,6 @@ public class TestDistributedFileSystem { /** * Tests DFSClient.close throws no ConcurrentModificationException if * multiple files are open. - * Also tests that any cached sockets are closed. (HDFS-3359) */ @Test public void testDFSClose() throws Exception { @@ -109,12 +108,9 @@ public class TestDistributedFileSystem { DFSTestUtil.readFile(fileSys, p); DFSClient client = ((DistributedFileSystem)fileSys).dfs; - SocketCache cache = client.socketCache; - assertEquals(1, cache.size()); fileSys.close(); - assertEquals(0, cache.size()); } finally { if (cluster != null) {cluster.shutdown();} }