incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/3] git commit: Ensuring that resources are being cleaned up correctly in the hdfsdirectory.
Date Thu, 18 Jun 2015 13:01:29 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master 020f90ab0 -> ef53f55f7


Ensuring that resources are being cleaned up correctly in the hdfsdirectory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e9f38d00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e9f38d00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e9f38d00

Branch: refs/heads/master
Commit: e9f38d00c5e576a219d2156cd22406c530e4f18c
Parents: 34af045
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Jun 18 08:23:41 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Jun 18 08:23:41 2015 -0400

----------------------------------------------------------------------
 .../apache/blur/store/hdfs/HdfsDirectory.java   |  90 ++++++++----
 .../apache/blur/store/hdfs/HdfsIndexInput.java  |   5 -
 .../store/hdfs/HdfsDirectoryResourceTest.java   | 140 +++++++++++++++++++
 .../apache/blur/memory/MemoryLeakDetector.java  |  34 +++--
 4 files changed, 225 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9f38d00/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
index f117c4d..946b16d 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsDirectory.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.memory.MemoryLeakDetector;
 import org.apache.blur.store.blockcache.LastModified;
 import org.apache.blur.store.hdfs_v2.HdfsUtils;
 import org.apache.blur.trace.Trace;
@@ -150,6 +151,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
   protected final SequentialReadControl _sequentialReadControl;
   protected final String _hostname;
   protected final TimerTask _reportOnBlockLocality;
+  protected final boolean _resourceTracking;
 
   public HdfsDirectory(Configuration configuration, Path path) throws IOException {
     this(configuration, path, new SequentialReadControl(new BlurConfiguration()));
@@ -162,6 +164,12 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl,
       Collection<String> filesToExpose) throws IOException {
+    this(configuration, path, sequentialReadControl, filesToExpose, false);
+  }
+
+  public HdfsDirectory(Configuration configuration, Path path, SequentialReadControl sequentialReadControl,
+      Collection<String> filesToExpose, boolean resourceTracking) throws IOException
{
+    _resourceTracking = resourceTracking;
     if (sequentialReadControl == null) {
       _sequentialReadControl = new SequentialReadControl(new BlurConfiguration());
     } else {
@@ -318,7 +326,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
           if (closeable == null) {
             return;
           }
-          LOG.debug("Closing [{0}]", closeable);
+          LOG.info("Closing [{0}] [{1}]", System.identityHashCode(closeable), closeable);
           org.apache.hadoop.io.IOUtils.cleanup(LOG, closeable);
         }
       }
@@ -373,6 +381,7 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       _fileStatusMap.put(name, new FStat(System.currentTimeMillis(), 0L));
     }
     final FSDataOutputStream outputStream = openForOutput(name);
+    trackObject(outputStream, "Outputstream", name, _path);
     return new BufferedIndexOutput() {
 
       @Override
@@ -414,6 +423,12 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     };
   }
 
+  protected <T> void trackObject(T t, String message, Object... args) {
+    if (_resourceTracking) {
+      MemoryLeakDetector.record(t, message, args);
+    }
+  }
+
   protected FSDataOutputStream openForOutput(String name) throws IOException {
     Path path = getPath(name);
     Tracer trace = Trace.trace("filesystem - create", Trace.param("path", path));
@@ -439,14 +454,15 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
 
   protected synchronized FSDataInputRandomAccess openForInput(String name, long length) throws
IOException {
     final Path path = getPath(name);
-    FSDataInputRandomAccess input = _inputMap.get(path);
+    FSDataInputRandomAccess input = _inputMap.get(name);
     if (input != null) {
       return input;
     }
     Tracer trace = Trace.trace("filesystem - open", Trace.param("path", path));
     try {
       final FSDataInputStream inputStream = _fileSystem.open(path);
-      FSDataInputRandomAccess randomInputStream = toFSDataInputRandomAccess(path, inputStream,
length);
+      trackObject(inputStream, "Random Inputstream", name, path);
+      FSDataInputRandomAccess randomInputStream = new HdfsFSDataInputRandomAccess(inputStream,
path, length);
       _inputMap.put(name, randomInputStream);
       return randomInputStream;
     } finally {
@@ -454,37 +470,41 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     }
   }
 
-  protected FSDataInputRandomAccess toFSDataInputRandomAccess(final Path path, final FSDataInputStream
inputStream,
-      final long length) {
-    FSDataInputRandomAccess randomInputStream = new FSDataInputRandomAccess() {
+  static class HdfsFSDataInputRandomAccess implements FSDataInputRandomAccess {
+    private final FSDataInputStream _inputStream;
+    private final Path _path;
+    private final long _length;
 
-      @Override
-      public void close() throws IOException {
-        inputStream.close();
-      }
+    HdfsFSDataInputRandomAccess(FSDataInputStream inputStream, Path path, long length) {
+      _inputStream = inputStream;
+      _path = path;
+      _length = length;
+    }
 
-      @Override
-      public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
-        return inputStream.read(filePointer, b, offset, length);
-      }
+    @Override
+    public void close() throws IOException {
+      _inputStream.close();
+    }
 
-      @Override
-      public String toString() {
-        return path.toString();
-      }
+    @Override
+    public int read(long filePointer, byte[] b, int offset, int length) throws IOException
{
+      return _inputStream.read(filePointer, b, offset, length);
+    }
 
-      @Override
-      public Path getPath() {
-        return path;
-      }
+    @Override
+    public String toString() {
+      return _path.toString();
+    }
 
-      @Override
-      public long length() {
-        return length;
-      }
+    @Override
+    public Path getPath() {
+      return _path;
+    }
 
-    };
-    return randomInputStream;
+    @Override
+    public long length() {
+      return _length;
+    }
   }
 
   @Override
@@ -560,6 +580,8 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
     Tracer trace = Trace.trace("filesystem - delete", Trace.param("path", getPath(name)));
     if (inputStream != null) {
       IOUtils.closeQuietly(inputStream);
+    } else {
+      LOG.error("Strange problem, random access input was not found for [{0}]", name);
     }
     if (_useCache) {
       _symlinkMap.remove(name);
@@ -765,7 +787,15 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       throws IOException {
     final Path path = getPath(name);
     final FSDataInputStream input = fileSystem.open(path);
-    FSDataInputSequentialAccess sequentialAccess = new FSDataInputSequentialAccess() {
+    trackObject(input, "Sequential Inputstream", name, _path);
+    FSDataInputSequentialAccess sequentialAccess = toFSDataInputSequentialAccess(path, input);
+    WEAK_CLOSING_QUEUE.add(new WeakRef(sequentialAccess, key));
+    return sequentialAccess;
+  }
+
+  private static FSDataInputSequentialAccess toFSDataInputSequentialAccess(final Path path,
+      final FSDataInputStream input) {
+    return new FSDataInputSequentialAccess() {
 
       @Override
       public void close() throws IOException {
@@ -798,8 +828,6 @@ public class HdfsDirectory extends Directory implements LastModified,
HdfsSymlin
       }
 
     };
-    WEAK_CLOSING_QUEUE.add(new WeakRef(sequentialAccess, key));
-    return sequentialAccess;
   }
 
   static class WeakRef {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9f38d00/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
index 665a025..9f2f45e 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs/HdfsIndexInput.java
@@ -73,11 +73,6 @@ public class HdfsIndexInput extends ReusedBufferedIndexInput {
       _sequentialReadControl.incrReadDetector();
     } else {
       if (_sequentialReadControl.isEnabled()) {
-        // System.out.println("Sequential Read OFF clone [" + _isClone + "] ["
-        // + _path + "] count ["
-        // + (_sequentialReadDetectorCounter - _sequentialReadThreshold) +
-        // "]");
-
         if (_sequentialReadControl.shouldSkipInput(filePointer, _prevFilePointer)) {
           _sequentialInput.skip(filePointer - _prevFilePointer);
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9f38d00/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
----------------------------------------------------------------------
diff --git a/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
new file mode 100644
index 0000000..ce00edc
--- /dev/null
+++ b/blur-store/src/test/java/org/apache/blur/store/hdfs/HdfsDirectoryResourceTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.store.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.blur.HdfsMiniClusterUtil;
+import org.apache.blur.memory.MemoryLeakDetector;
+import org.apache.blur.utils.JavaHome;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class HdfsDirectoryResourceTest {
+
+  private static final int WAIT_TIME_IN_SECONDS = 10000;
+
+  private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir",
+      "./target/tmp_HdfsDirectoryResourceTest"));
+
+  private static Configuration _configuration = new Configuration();
+  private static MiniDFSCluster _cluster;
+  private static Path _root;
+
+  private Random random = new Random();
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    JavaHome.checkJavaHome();
+    _cluster = HdfsMiniClusterUtil.startDfs(_configuration, true, TMPDIR.getAbsolutePath());
+    _root = new Path(_cluster.getFileSystem().getUri() + "/");
+    MemoryLeakDetector.setEnabled(true);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    MemoryLeakDetector.setEnabled(false);
+    HdfsMiniClusterUtil.shutdownDfs(_cluster);
+  }
+
+  @Test
+  public void testResourceTracking() throws IOException, InterruptedException {
+    Path path = new Path(_root, "testResourceTracking");
+    boolean resourceTracking = true;
+    HdfsDirectory dir = new HdfsDirectory(_configuration, path, null, null, resourceTracking);
+    try {
+      String name = "_1.file";
+      exeucteWrites(dir, name);
+      executeReads(dir, name);
+      assertTrue(waitForSeqReadsToClose());
+      dir.deleteFile(name);
+      assertTrue(waitForRandomAccessToClose());
+    } finally {
+      dir.close();
+    }
+  }
+
+  private void exeucteWrites(HdfsDirectory dir, String name) throws IOException {
+    IndexOutput output = dir.createOutput(name, IOContext.DEFAULT);
+    writeData(output, 100000000L);
+    output.close();
+  }
+
+  private void executeReads(HdfsDirectory dir, String name) throws IOException {
+    IndexInput input = dir.openInput(name, IOContext.READ);
+    input.readLong();
+    input.seek(0L);
+    for (int i = 0; i < 2; i++) {
+      readSeq(input.clone(), 20000000L);
+    }
+    input.close();
+  }
+
+  private boolean waitForRandomAccessToClose() throws InterruptedException {
+    for (int i = 0; i < WAIT_TIME_IN_SECONDS; i++) {
+      Thread.sleep(1000);
+      Runtime.getRuntime().gc();
+      Runtime.getRuntime().gc();
+      if (MemoryLeakDetector.isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean waitForSeqReadsToClose() throws InterruptedException {
+    for (int i = 0; i < WAIT_TIME_IN_SECONDS; i++) {
+      Thread.sleep(1000);
+      Runtime.getRuntime().gc();
+      Runtime.getRuntime().gc();
+      if (MemoryLeakDetector.getCount() == 1) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void readSeq(IndexInput input, long read) throws IOException {
+    byte[] buf = new byte[1024];
+    while (read > 0) {
+      int len = (int) Math.min(buf.length, read);
+      input.readBytes(buf, 0, len);
+      read -= len;
+    }
+  }
+
+  private void writeData(IndexOutput output, long write) throws IOException {
+    byte[] buf = new byte[1024];
+    while (write > 0) {
+      random.nextBytes(buf);
+      int length = (int) Math.min(write, buf.length);
+      output.writeBytes(buf, length);
+      write -= length;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e9f38d00/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
index d5b1066..391a76d 100644
--- a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
+++ b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
@@ -18,24 +18,24 @@ package org.apache.blur.memory;
 
 import java.text.MessageFormat;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentMap;
+import java.util.WeakHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 
-import com.google.common.collect.MapMaker;
-
 public class MemoryLeakDetector {
 
   private static final Log LOG = LogFactory.getLog(MemoryLeakDetector.class);
-  
+
   private static boolean _enabled = false;
-  private static final ConcurrentMap<Object, Info> _map;
+  private static final Map<Object, Info> _map;
   private static Timer _timer;
 
   static class Info {
@@ -54,9 +54,17 @@ public class MemoryLeakDetector {
 
   }
 
+  public static boolean isEnabled() {
+    return _enabled;
+  }
+
+  public static void setEnabled(boolean enabled) {
+    MemoryLeakDetector._enabled = enabled;
+  }
+
   static {
-    _map = new MapMaker().weakKeys().makeMap();
-    _timer = new Timer("MemoryLeakDetector",true);
+    _map = Collections.synchronizedMap(new WeakHashMap<Object, Info>());
+    _timer = new Timer("MemoryLeakDetector", true);
     _timer.schedule(new TimerTask() {
       @Override
       public void run() {
@@ -76,13 +84,23 @@ public class MemoryLeakDetector {
 
   public static void dump() {
     Set<Entry<Object, Info>> entrySet = _map.entrySet();
+    int count = 0;
     for (Entry<Object, Info> e : entrySet) {
       Object o = e.getKey();
       if (o != null) {
         Info info = e.getValue();
-        LOG.info("Object [{0}] Info [{1}]", o, info);
+        LOG.info("Id [{0}] Hashcode [{1}] Object [{2}] Info [{3}]", count, System.identityHashCode(o),
o, info);
+        count++;
       }
     }
   }
 
+  public static boolean isEmpty() {
+    return _map.isEmpty();
+  }
+
+  public static int getCount() {
+    return _map.size();
+  }
+
 }


Mime
View raw message