Return-Path: X-Original-To: apmail-drill-commits-archive@www.apache.org Delivered-To: apmail-drill-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB4AC10A80 for ; Tue, 17 Feb 2015 23:28:27 +0000 (UTC) Received: (qmail 47555 invoked by uid 500); 17 Feb 2015 23:28:24 -0000 Delivered-To: apmail-drill-commits-archive@drill.apache.org Received: (qmail 47523 invoked by uid 500); 17 Feb 2015 23:28:24 -0000 Mailing-List: contact commits-help@drill.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@drill.apache.org Delivered-To: mailing list commits@drill.apache.org Received: (qmail 47514 invoked by uid 99); 17 Feb 2015 23:28:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Feb 2015 23:28:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 73096E07F3; Tue, 17 Feb 2015 23:28:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: venki@apache.org To: commits@drill.apache.org Date: Tue, 17 Feb 2015 23:28:24 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] drill git commit: DRILL-2251: Fix resource leaking with new DrillFileSystem Repository: drill Updated Branches: refs/heads/master d9b61fac2 -> 9c4d91d01 DRILL-2251: Fix resource leaking with new DrillFileSystem Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ccaabdbb Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ccaabdbb Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ccaabdbb Branch: refs/heads/master Commit: ccaabdbbd4a1baa9245514d68e683d1409333bf6 Parents: d9b61fa Author: vkorukanti Authored: Thu Feb 12 17:27:05 2015 -0800 Committer: vkorukanti Committed: Tue Feb 17 11:20:46 2015 -0800 ---------------------------------------------------------------------- .../exec/store/dfs/DrillFSDataInputStream.java | 131 +++++++++++++++++-- .../drill/exec/store/dfs/DrillFileSystem.java | 93 +++++++++++-- .../drill/exec/store/dfs/OpenFileTracker.java | 36 +++++ .../exec/store/dfs/TestDrillFileSystem.java | 25 +++- 4 files changed, 258 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java index 44ef8a3..5be6d10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.dfs; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.ReadOption; @@ -35,11 +36,20 @@ import java.util.EnumSet; * Wrapper around FSDataInputStream to collect IO Stats. */ public class DrillFSDataInputStream extends FSDataInputStream { - private FSDataInputStream underlyingIs; + private final FSDataInputStream underlyingIs; + private final OpenFileTracker openFileTracker; + private final OperatorStats operatorStats; public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException { + this(in, operatorStats, null); + } + + public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats, + OpenFileTracker openFileTracker) throws IOException { super(new WrappedInputStream(in, operatorStats)); - this.underlyingIs = in; + underlyingIs = in; + this.openFileTracker = openFileTracker; + this.operatorStats = operatorStats; } @Override @@ -54,17 +64,32 @@ public class DrillFSDataInputStream extends FSDataInputStream { @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { - return underlyingIs.read(position, buffer, offset, length); + operatorStats.startWait(); + try { + return underlyingIs.read(position, buffer, offset, length); + } finally { + operatorStats.stopWait(); + } } @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - underlyingIs.readFully(position, buffer, offset, length); + operatorStats.startWait(); + try { + underlyingIs.readFully(position, buffer, offset, length); + } finally { + operatorStats.stopWait(); + } } @Override public void readFully(long position, byte[] buffer) throws IOException { - underlyingIs.readFully(position, buffer); + operatorStats.startWait(); + try { + underlyingIs.readFully(position, buffer); + } finally { + operatorStats.stopWait(); + } } @Override @@ -73,13 +98,19 @@ public class DrillFSDataInputStream extends FSDataInputStream { } @Override + @LimitedPrivate({"HDFS"}) public InputStream getWrappedStream() { return underlyingIs.getWrappedStream(); } @Override public int read(ByteBuffer buf) throws IOException { - return underlyingIs.read(buf); + operatorStats.startWait(); + try { + return underlyingIs.read(buf); + } finally { + operatorStats.stopWait(); + } } @Override @@ -99,7 +130,12 @@ public class DrillFSDataInputStream extends FSDataInputStream { @Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet opts) throws IOException, UnsupportedOperationException { - return underlyingIs.read(bufferPool, maxLength, opts); + operatorStats.startWait(); + try { + return underlyingIs.read(bufferPool, maxLength, opts); + } finally { + operatorStats.stopWait(); + } } @Override @@ -107,6 +143,44 @@ public class DrillFSDataInputStream extends FSDataInputStream { underlyingIs.releaseBuffer(buffer); } + @Override + public int read() throws IOException { + return underlyingIs.read(); + } + + @Override + public long skip(long n) throws IOException { + return underlyingIs.skip(n); + } + + @Override + public int available() throws IOException { + return underlyingIs.available(); + } + + @Override + public void close() throws IOException { + if (openFileTracker != null) { + openFileTracker.fileClosed(this); + } + underlyingIs.close(); + } + + @Override + public void mark(int readlimit) { + underlyingIs.mark(readlimit); + } + + @Override + public void reset() throws IOException { + underlyingIs.reset(); + } + + @Override + public boolean markSupported() { + return underlyingIs.markSupported(); + } + /** * We need to wrap the FSDataInputStream inside a InputStream, because read() method in InputStream is * overridden in FilterInputStream (super class of FSDataInputStream) as final, so we can not override in @@ -133,14 +207,51 @@ public class DrillFSDataInputStream extends FSDataInputStream { @Override public int read(byte[] b, int off, int len) throws IOException { operatorStats.startWait(); - int numBytesRead; try { - numBytesRead = is.read(b, off, len); + return is.read(b, off, len); + } finally { + operatorStats.stopWait(); + } + } + + @Override + public int read(byte[] b) throws IOException { + operatorStats.startWait(); + try { + return is.read(b); } finally { operatorStats.stopWait(); } + } + + @Override + public long skip(long n) throws IOException { + return is.skip(n); + } - return numBytesRead; + @Override + public int available() throws IOException { + return is.available(); + } + + @Override + public void close() throws IOException { + is.close(); + } + + @Override + public synchronized void mark(int readlimit) { + is.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + is.reset(); + } + + @Override + public boolean markSupported() { + return is.markSupported(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java index f5730a1..2683cca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.net.URI; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.ConcurrentMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.util.AssertionUtil; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -58,18 +61,42 @@ import org.apache.hadoop.util.Progressable; /** * DrillFileSystem is the wrapper around the actual FileSystem implementation. * - * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns a instrumented FSDataInputStream to - * measure IO wait time. + * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns an instrumented FSDataInputStream to + * measure IO wait time and tracking file open/close operations. */ -public class DrillFileSystem extends FileSystem { +public class DrillFileSystem extends FileSystem implements OpenFileTracker { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class); + private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled(); + private final static ConcurrentMap openedFiles = Maps.newConcurrentMap(); + + static { + if (TRACKING_ENABLED) { + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + if (openedFiles.size() != 0) { + final StringBuffer errMsgBuilder = new StringBuffer(); + + errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" + + " still [%d] files open.\n", openedFiles.size())); + + for(DebugStackTrace stackTrace : openedFiles.values()) { + stackTrace.addToStringBuilder(errMsgBuilder); + } + + final String errMsg = errMsgBuilder.toString(); + logger.error(errMsg); + throw new IllegalStateException(errMsg); + } + } + }); + } + } private final FileSystem underlyingFs; private final OperatorStats operatorStats; public DrillFileSystem(Configuration fsConf) throws IOException { - this.underlyingFs = FileSystem.get(fsConf); - this.operatorStats = null; + this(FileSystem.get(fsConf), null); } public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) { @@ -96,11 +123,17 @@ public class DrillFileSystem extends FileSystem { */ @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { - if (operatorStats != null) { - return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats); + if (operatorStats == null) { + return underlyingFs.open(f, bufferSize); + } + + if (TRACKING_ENABLED) { + DrillFSDataInputStream is = new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats, this); + fileOpened(f, is); + return is; } - return underlyingFs.open(f, bufferSize); + return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats); } /** @@ -108,11 +141,17 @@ public class DrillFileSystem extends FileSystem { */ @Override public FSDataInputStream open(Path f) throws IOException { - if (operatorStats != null) { - return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats); + if (operatorStats == null) { + return underlyingFs.open(f); } - return underlyingFs.open(f); + if (TRACKING_ENABLED) { + DrillFSDataInputStream is = new DrillFSDataInputStream(underlyingFs.open(f), operatorStats, this); + fileOpened(f, is); + return is; + } + + return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats); } @Override @@ -678,5 +717,37 @@ public class DrillFileSystem extends FileSystem { } } + @Override + public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) { + openedFiles.put(fsDataInputStream, new DebugStackTrace(path, Thread.currentThread().getStackTrace())); + } + @Override + public void fileClosed(DrillFSDataInputStream fsDataInputStream) { + openedFiles.remove(fsDataInputStream); + } + + public static class DebugStackTrace { + final private StackTraceElement[] elements; + final private Path path; + + public DebugStackTrace(Path path, StackTraceElement[] elements) { + this.path = path; + this.elements = elements; + } + + public void addToStringBuilder(StringBuffer sb) { + sb.append("File '"); + sb.append(path.toString()); + sb.append("' opened at callstack:\n"); + + // add all stack elements except the top three as they point to DrillFileSystem.open() and inner stack elements. + for (int i = 3; i < elements.length; i++) { + sb.append("\t"); + sb.append(elements[i]); + sb.append("\n"); + } + sb.append("\n"); + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java new file mode 100644 index 0000000..f99eb8b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs; + +import org.apache.hadoop.fs.Path; + +/** + * Interface to track opening and closing of files. + */ +public interface OpenFileTracker { + /** + * Add new file location and {@link org.apache.drill.exec.store.dfs.DrillFSDataInputStream} to list. + */ + public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream); + + /** + * Remove the given {@link org.apache.drill.exec.store.dfs.DrillFSDataInputStream} from opened file list. + * @param fsDataInputStream + */ + public void fileClosed(DrillFSDataInputStream fsDataInputStream); +} http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java index 5c71d08..ab6639e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java @@ -63,23 +63,36 @@ public class TestDrillFileSystem { @Test public void testIOStats() throws Exception { + DrillFileSystem dfs = null; + InputStream is = null; Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/); OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/); + // start wait time method in OperatorStats expects the OperatorStats state to be in "processing" stats.startProcessing(); - DrillFileSystem dfs = new DrillFileSystem(FileSystem.get(conf), stats); - InputStream is = dfs.open(new Path(tempFilePath)); + try { + dfs = new DrillFileSystem(FileSystem.get(conf), stats); + is = dfs.open(new Path(tempFilePath)); - byte[] buf = new byte[8000]; - while (is.read(buf, 0, buf.length) != -1) { } + byte[] buf = new byte[8000]; + while (is.read(buf, 0, buf.length) != -1) { + } + } finally { + stats.stopProcessing(); - stats.stopProcessing(); + if (is != null) { + is.close(); + } - OperatorProfile operatorProfile = stats.getProfile(); + if (dfs != null) { + dfs.close(); + } + } + OperatorProfile operatorProfile = stats.getProfile(); assertTrue("Expected wait time is non-zero, but got zero wait time", operatorProfile.getWaitNanos() > 0); }