Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8909AD8AB for ; Fri, 17 May 2013 03:39:45 +0000 (UTC) Received: (qmail 52932 invoked by uid 500); 17 May 2013 03:39:45 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 52567 invoked by uid 500); 17 May 2013 03:39:41 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 52487 invoked by uid 99); 17 May 2013 03:39:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 03:39:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 May 2013 03:39:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 90A3F23889F1; Fri, 17 May 2013 03:39:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1483641 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java Date: Fri, 17 May 2013 03:39:18 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130517033918.90A3F23889F1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Fri May 17 03:39:18 2013 New Revision: 1483641 URL: http://svn.apache.org/r1483641 Log: HDFS-4824. FileInputStreamCache.close leaves dangling reference to FileInputStreamCache.cacheCleaner. Contributed by Colin Patrick McCabe. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1483641&r1=1483640&r2=1483641&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri May 17 03:39:18 2013 @@ -951,6 +951,9 @@ Release 2.0.5-beta - UNRELEASED HDFS-4830. Typo in config settings for AvailableSpaceVolumeChoosingPolicy in hdfs-default.xml. (atm) + HDFS-4824. FileInputStreamCache.close leaves dangling reference to + FileInputStreamCache.cacheCleaner. (Colin Patrick McCabe via todd) + BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java?rev=1483641&r1=1483640&r2=1483641&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java Fri May 17 03:39:18 2013 @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hdfs; +import java.io.Closeable; import java.io.FileInputStream; +import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -80,17 +84,26 @@ class FileInputStreamCache { * Expiry thread which makes sure that the file descriptors get closed * after a while. */ - class CacheCleaner implements Runnable { + private static class CacheCleaner implements Runnable, Closeable { + private WeakReference cacheRef; + private ScheduledFuture future; + + CacheCleaner(FileInputStreamCache cache) { + this.cacheRef = new WeakReference(cache); + } + @Override public void run() { - synchronized(FileInputStreamCache.this) { - if (closed) return; + FileInputStreamCache cache = cacheRef.get(); + if (cache == null) return; + synchronized(cache) { + if (cache.closed) return; long curTime = Time.monotonicNow(); - for (Iterator> iter = map.entries().iterator(); - iter.hasNext(); - iter = map.entries().iterator()) { + for (Iterator> iter = + cache.map.entries().iterator(); iter.hasNext(); + iter = cache.map.entries().iterator()) { Entry entry = iter.next(); - if (entry.getValue().getTime() + expiryTimeMs >= curTime) { + if (entry.getValue().getTime() + cache.expiryTimeMs >= curTime) { break; } entry.getValue().close(); @@ -98,6 +111,17 @@ class FileInputStreamCache { } } } + + @Override + public void close() throws IOException { + if (future != null) { + future.cancel(false); + } + } + + public void setFuture(ScheduledFuture future) { + this.future = future; + } } /** @@ -189,9 +213,11 @@ class FileInputStreamCache { iter.remove(); } if (cacheCleaner == null) { - cacheCleaner = new CacheCleaner(); - executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs, - TimeUnit.MILLISECONDS); + cacheCleaner = new CacheCleaner(this); + ScheduledFuture future = + executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs, + TimeUnit.MILLISECONDS); + cacheCleaner.setFuture(future); } map.put(new Key(datanodeID, block), new Value(fis)); inserted = true; @@ -229,9 +255,7 @@ class FileInputStreamCache { public synchronized void close() { if (closed) return; closed = true; - if (cacheCleaner != null) { - executor.remove(cacheCleaner); - } + IOUtils.cleanup(LOG, cacheCleaner); for (Iterator> iter = map.entries().iterator(); iter.hasNext();) { Entry entry = iter.next();