Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7273101E7 for ; Sat, 19 Oct 2013 00:18:01 +0000 (UTC) Received: (qmail 69930 invoked by uid 500); 19 Oct 2013 00:18:01 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 69805 invoked by uid 500); 19 Oct 2013 00:18:00 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 69798 invoked by uid 99); 19 Oct 2013 00:18:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Oct 2013 00:18:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Oct 2013 00:17:58 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9035223888E4; Sat, 19 Oct 2013 00:17:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1533668 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt src/main/java/org/apache/hadoop/fs/FileSystem.java src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java Date: Sat, 19 Oct 2013 00:17:38 -0000 To: common-commits@hadoop.apache.org From: cmccabe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131019001738.9035223888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cmccabe Date: Sat Oct 19 00:17:37 2013 New Revision: 1533668 URL: http://svn.apache.org/r1533668 Log: HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid multi-threaded performance issues on read/write. (Colin Patrick McCabe) Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1533668&r1=1533667&r2=1533668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Sat Oct 19 00:17:37 2013 @@ -363,6 +363,9 @@ Release 2.3.0 - UNRELEASED HADOOP-9078. enhance unit-test coverage of class org.apache.hadoop.fs.FileContext (Ivan A. Veselovsky via jeagles) + HDFS-5276. FileSystem.Statistics should use thread-local counters to avoid + multi-threaded performance issues on read/write. (Colin Patrick McCabe) + OPTIMIZATIONS HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java?rev=1533668&r1=1533667&r2=1533668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java Sat Oct 19 00:17:37 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.fs; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.ref.WeakReference; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; @@ -31,6 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -2501,28 +2503,149 @@ public abstract class FileSystem extends } } + /** + * Tracks statistics about how many reads, writes, and so forth have been + * done in a FileSystem. + * + * Since there is only one of these objects per FileSystem, there will + * typically be many threads writing to this object. Almost every operation + * on an open file will involve a write to this object. In contrast, reading + * statistics is done infrequently by most programs, and not at all by others. + * Hence, this is optimized for writes. + * + * Each thread writes to its own thread-local area of memory. This removes + * contention and allows us to scale up to many, many threads. To read + * statistics, the reader thread totals up the contents of all of the + * thread-local data areas. + */ public static final class Statistics { + /** + * Statistics data. + * + * There is only a single writer to thread-local StatisticsData objects. + * Hence, volatile is adequate here-- we do not need AtomicLong or similar + * to prevent lost updates. + * The Java specification guarantees that updates to volatile longs will + * be perceived as atomic with respect to other threads, which is all we + * need. + */ + private static class StatisticsData { + volatile long bytesRead; + volatile long bytesWritten; + volatile int readOps; + volatile int largeReadOps; + volatile int writeOps; + /** + * Stores a weak reference to the thread owning this StatisticsData. + * This allows us to remove StatisticsData objects that pertain to + * threads that no longer exist. + */ + final WeakReference owner; + + StatisticsData(WeakReference owner) { + this.owner = owner; + } + + /** + * Add another StatisticsData object to this one. + */ + void add(StatisticsData other) { + this.bytesRead += other.bytesRead; + this.bytesWritten += other.bytesWritten; + this.readOps += other.readOps; + this.largeReadOps += other.largeReadOps; + this.writeOps += other.writeOps; + } + + /** + * Negate the values of all statistics. + */ + void negate() { + this.bytesRead = -this.bytesRead; + this.bytesWritten = -this.bytesWritten; + this.readOps = -this.readOps; + this.largeReadOps = -this.largeReadOps; + this.writeOps = -this.writeOps; + } + + @Override + public String toString() { + return bytesRead + " bytes read, " + bytesWritten + " bytes written, " + + readOps + " read ops, " + largeReadOps + " large read ops, " + + writeOps + " write ops"; + } + } + + private interface StatisticsAggregator { + void accept(StatisticsData data); + T aggregate(); + } + private final String scheme; - private AtomicLong bytesRead = new AtomicLong(); - private AtomicLong bytesWritten = new AtomicLong(); - private AtomicInteger readOps = new AtomicInteger(); - private AtomicInteger largeReadOps = new AtomicInteger(); - private AtomicInteger writeOps = new AtomicInteger(); + + /** + * rootData is data that doesn't belong to any thread, but will be added + * to the totals. This is useful for making copies of Statistics objects, + * and for storing data that pertains to threads that have been garbage + * collected. Protected by the Statistics lock. + */ + private final StatisticsData rootData; + + /** + * Thread-local data. + */ + private final ThreadLocal threadData; + /** + * List of all thread-local data areas. Protected by the Statistics lock. + */ + private LinkedList allData; + public Statistics(String scheme) { this.scheme = scheme; + this.rootData = new StatisticsData(null); + this.threadData = new ThreadLocal(); + this.allData = null; } /** * Copy constructor. * - * @param st - * The input Statistics object which is cloned. + * @param other The input Statistics object which is cloned. */ - public Statistics(Statistics st) { - this.scheme = st.scheme; - this.bytesRead = new AtomicLong(st.bytesRead.longValue()); - this.bytesWritten = new AtomicLong(st.bytesWritten.longValue()); + public Statistics(Statistics other) { + this.scheme = other.scheme; + this.rootData = new StatisticsData(null); + other.visitAll(new StatisticsAggregator() { + @Override + public void accept(StatisticsData data) { + rootData.add(data); + } + + public Void aggregate() { + return null; + } + }); + this.threadData = new ThreadLocal(); + } + + /** + * Get or create the thread-local data associated with the current thread. + */ + private StatisticsData getThreadData() { + StatisticsData data = threadData.get(); + if (data == null) { + data = new StatisticsData( + new WeakReference(Thread.currentThread())); + threadData.set(data); + synchronized(this) { + if (allData == null) { + allData = new LinkedList(); + } + allData.add(data); + } + } + return data; } /** @@ -2530,7 +2653,7 @@ public abstract class FileSystem extends * @param newBytes the additional bytes read */ public void incrementBytesRead(long newBytes) { - bytesRead.getAndAdd(newBytes); + getThreadData().bytesRead += newBytes; } /** @@ -2538,7 +2661,7 @@ public abstract class FileSystem extends * @param newBytes the additional bytes written */ public void incrementBytesWritten(long newBytes) { - bytesWritten.getAndAdd(newBytes); + getThreadData().bytesWritten += newBytes; } /** @@ -2546,7 +2669,7 @@ public abstract class FileSystem extends * @param count number of read operations */ public void incrementReadOps(int count) { - readOps.getAndAdd(count); + getThreadData().readOps += count; } /** @@ -2554,7 +2677,7 @@ public abstract class FileSystem extends * @param count number of large read operations */ public void incrementLargeReadOps(int count) { - largeReadOps.getAndAdd(count); + getThreadData().largeReadOps += count; } /** @@ -2562,7 +2685,38 @@ public abstract class FileSystem extends * @param count number of write operations */ public void incrementWriteOps(int count) { - writeOps.getAndAdd(count); + getThreadData().writeOps += count; + } + + /** + * Apply the given aggregator to all StatisticsData objects associated with + * this Statistics object. + * + * For each StatisticsData object, we will call accept on the visitor. + * Finally, at the end, we will call aggregate to get the final total. + * + * @param The visitor to use. + * @return The total. + */ + private synchronized T visitAll(StatisticsAggregator visitor) { + visitor.accept(rootData); + if (allData != null) { + for (Iterator iter = allData.iterator(); + iter.hasNext(); ) { + StatisticsData data = iter.next(); + visitor.accept(data); + if (data.owner.get() == null) { + /* + * If the thread that created this thread-local data no + * longer exists, remove the StatisticsData from our list + * and fold the values into rootData. + */ + rootData.add(data); + iter.remove(); + } + } + } + return visitor.aggregate(); } /** @@ -2570,7 +2724,18 @@ public abstract class FileSystem extends * @return the number of bytes */ public long getBytesRead() { - return bytesRead.get(); + return visitAll(new StatisticsAggregator() { + private long bytesRead = 0; + + @Override + public void accept(StatisticsData data) { + bytesRead += data.bytesRead; + } + + public Long aggregate() { + return bytesRead; + } + }); } /** @@ -2578,7 +2743,18 @@ public abstract class FileSystem extends * @return the number of bytes */ public long getBytesWritten() { - return bytesWritten.get(); + return visitAll(new StatisticsAggregator() { + private long bytesWritten = 0; + + @Override + public void accept(StatisticsData data) { + bytesWritten += data.bytesWritten; + } + + public Long aggregate() { + return bytesWritten; + } + }); } /** @@ -2586,7 +2762,19 @@ public abstract class FileSystem extends * @return number of read operations */ public int getReadOps() { - return readOps.get() + largeReadOps.get(); + return visitAll(new StatisticsAggregator() { + private int readOps = 0; + + @Override + public void accept(StatisticsData data) { + readOps += data.readOps; + readOps += data.largeReadOps; + } + + public Integer aggregate() { + return readOps; + } + }); } /** @@ -2595,7 +2783,18 @@ public abstract class FileSystem extends * @return number of large read operations */ public int getLargeReadOps() { - return largeReadOps.get(); + return visitAll(new StatisticsAggregator() { + private int largeReadOps = 0; + + @Override + public void accept(StatisticsData data) { + largeReadOps += data.largeReadOps; + } + + public Integer aggregate() { + return largeReadOps; + } + }); } /** @@ -2604,22 +2803,70 @@ public abstract class FileSystem extends * @return number of write operations */ public int getWriteOps() { - return writeOps.get(); + return visitAll(new StatisticsAggregator() { + private int writeOps = 0; + + @Override + public void accept(StatisticsData data) { + writeOps += data.writeOps; + } + + public Integer aggregate() { + return writeOps; + } + }); } + @Override public String toString() { - return bytesRead + " bytes read, " + bytesWritten + " bytes written, " - + readOps + " read ops, " + largeReadOps + " large read ops, " - + writeOps + " write ops"; + return visitAll(new StatisticsAggregator() { + private StatisticsData total = new StatisticsData(null); + + @Override + public void accept(StatisticsData data) { + total.add(data); + } + + public String aggregate() { + return total.toString(); + } + }); } - + /** - * Reset the counts of bytes to 0. + * Resets all statistics to 0. + * + * In order to reset, we add up all the thread-local statistics data, and + * set rootData to the negative of that. + * + * This may seem like a counterintuitive way to reset the statsitics. Why + * can't we just zero out all the thread-local data? Well, thread-local + * data can only be modified by the thread that owns it. If we tried to + * modify the thread-local data from this thread, our modification might get + * interleaved with a read-modify-write operation done by the thread that + * owns the data. That would result in our update getting lost. + * + * The approach used here avoids this problem because it only ever reads + * (not writes) the thread-local data. Both reads and writes to rootData + * are done under the lock, so we're free to modify rootData from any thread + * that holds the lock. */ public void reset() { - bytesWritten.set(0); - bytesRead.set(0); + visitAll(new StatisticsAggregator() { + private StatisticsData total = new StatisticsData(null); + + @Override + public void accept(StatisticsData data) { + total.add(data); + } + + public Void aggregate() { + total.negate(); + rootData.add(total); + return null; + } + }); } /** Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java?rev=1533668&r1=1533667&r2=1533668&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java (original) +++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java Sat Oct 19 00:17:37 2013 @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem.S import org.junit.Assert; import org.junit.Test; +import com.google.common.util.concurrent.Uninterruptibles; + import static org.apache.hadoop.fs.FileContextTestHelper.*; /** @@ -44,6 +46,38 @@ public abstract class FCStatisticsBaseTe //fc should be set appropriately by the deriving test. protected static FileContext fc = null; + @Test(timeout=60000) + public void testStatisticsOperations() throws Exception { + final Statistics stats = new Statistics("file"); + Assert.assertEquals(0L, stats.getBytesRead()); + Assert.assertEquals(0L, stats.getBytesWritten()); + Assert.assertEquals(0, stats.getWriteOps()); + stats.incrementBytesWritten(1000); + Assert.assertEquals(1000L, stats.getBytesWritten()); + Assert.assertEquals(0, stats.getWriteOps()); + stats.incrementWriteOps(123); + Assert.assertEquals(123, stats.getWriteOps()); + + Thread thread = new Thread() { + @Override + public void run() { + stats.incrementWriteOps(1); + } + }; + thread.start(); + Uninterruptibles.joinUninterruptibly(thread); + Assert.assertEquals(124, stats.getWriteOps()); + // Test copy constructor and reset function + Statistics stats2 = new Statistics(stats); + stats.reset(); + Assert.assertEquals(0, stats.getWriteOps()); + Assert.assertEquals(0L, stats.getBytesWritten()); + Assert.assertEquals(0L, stats.getBytesRead()); + Assert.assertEquals(124, stats2.getWriteOps()); + Assert.assertEquals(1000L, stats2.getBytesWritten()); + Assert.assertEquals(0L, stats2.getBytesRead()); + } + @Test public void testStatistics() throws IOException, URISyntaxException { URI fsUri = getFsUri();