incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Fixing a few memory leaks, this would be a large problem with systems that perform a lot of mutates on the tables. Bulk loading is less of an issue.
Date Thu, 05 Mar 2015 15:18:01 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master df0e889e1 -> cc8e9806b


Fixing a few memory leaks, this would be a large problem with systems that perform a lot of
mutates on the tables.  Bulk loading is less of an issue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/cc8e9806
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/cc8e9806
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/cc8e9806

Branch: refs/heads/master
Commit: cc8e9806b0a4624e9bd19066af784766f6cb1bac
Parents: df0e889
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Mar 5 10:17:56 2015 -0500
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Mar 5 10:17:56 2015 -0500

----------------------------------------------------------------------
 .../manager/writer/BlurIndexSimpleWriter.java   | 32 ++++++-
 .../blur/manager/writer/IndexImporter.java      |  3 +
 .../blur/lucene/search/PrimeDocCache.java       |  2 +
 .../org/apache/blur/index/ExitableReader.java   |  6 ++
 .../hdfs_v2/FastHdfsKeyValueDirectory.java      |  2 +
 .../blur/store/hdfs_v2/HdfsKeyValueStore.java   | 20 +++--
 .../apache/blur/memory/MemoryLeakDetector.java  | 87 ++++++++++++++++++++
 7 files changed, 142 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index 66f8b01..7ea3634 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -58,6 +58,7 @@ import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
 import org.apache.blur.lucene.codec.Blur024Codec;
 import org.apache.blur.lucene.search.IndexSearcherCloseable;
+import org.apache.blur.memory.MemoryLeakDetector;
 import org.apache.blur.server.IndexSearcherCloseableBase;
 import org.apache.blur.server.IndexSearcherCloseableSecureBase;
 import org.apache.blur.server.ShardContext;
@@ -90,6 +91,8 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.SnappyCodec;
 import org.apache.hadoop.util.Progressable;
 import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.AtomicReader;
+import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.BlurIndexWriter;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexReader;
@@ -200,7 +203,11 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _directory = directory;
 
     _indexCloser = indexCloser;
-    _indexReader.set(wrap(DirectoryReader.open(_directory)));
+    DirectoryReader realDirectoryReader = DirectoryReader.open(_directory);
+    DirectoryReader wrappped = wrap(realDirectoryReader);
+    String message = "BlurIndexSimpleWriter - inital open";
+    DirectoryReader directoryReader = checkForMemoryLeaks(wrappped, message);
+    _indexReader.set(directoryReader);
 
     openWriter();
     _watchForIdleBulkWriters = new TimerTask() {
@@ -227,6 +234,25 @@ public class BlurIndexSimpleWriter extends BlurIndex {
     _bulkIndexingTimer.schedule(_watchForIdleBulkWriters, delay, delay);
   }
 
+  private DirectoryReader checkForMemoryLeaks(DirectoryReader wrappped, String message) {
+    DirectoryReader directoryReader = MemoryLeakDetector.record(wrappped, message, _tableContext.getTable(),
+        _shardContext.getShard());
+    if (directoryReader instanceof ExitableReader) {
+      ExitableReader exitableReader = (ExitableReader) directoryReader;
+      checkForMemoryLeaks(exitableReader.getIn().leaves(), message);
+    } else {
+      checkForMemoryLeaks(directoryReader.leaves(), message);
+    }
+    return directoryReader;
+  }
+
+  private void checkForMemoryLeaks(List<AtomicReaderContext> leaves, String message)
{
+    for (AtomicReaderContext context : leaves) {
+      AtomicReader reader = context.reader();
+      MemoryLeakDetector.record(reader, message, _tableContext.getTable(), _shardContext.getShard());
+    }
+  }
+
   private synchronized void openWriter() {
     IOUtils.cleanup(LOG, _indexImporter);
     BlurIndexWriter writer = _writer.get();
@@ -392,7 +418,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
   public void close() throws IOException {
     _isClosed.set(true);
     IOUtils.cleanup(LOG, makeCloseable(_watchForIdleBulkWriters), _indexImporter, _mutationQueueProcessor,
-        _writer.get(), _indexReader.get());
+        _writer.get(), _indexReader.get(), _directory);
   }
 
   private Closeable makeCloseable(final TimerTask timerTask) {
@@ -400,6 +426,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
       @Override
       public void close() throws IOException {
         timerTask.cancel();
+        _bulkIndexingTimer.purge();
       }
     };
   }
@@ -489,6 +516,7 @@ public class BlurIndexSimpleWriter extends BlurIndex {
           _shardContext.getShard());
     } else {
       DirectoryReader reader = wrap(newReader);
+      checkForMemoryLeaks(reader, "BlurIndexSimpleWriter - reopen table [{0}] shard [{1}]");
       _indexRefreshWriteLock.lock();
       try {
         _indexReader.set(reader);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index d4a539d..093c583 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -75,6 +75,7 @@ public class IndexImporter extends TimerTask implements Closeable {
   private final String _table;
   private final String _shard;
   private final long _cleanupDelay;
+  private final Timer _inindexImporterTimer;
 
   private long _lastCleanup;
   private Runnable _testError;
@@ -86,6 +87,7 @@ public class IndexImporter extends TimerTask implements Closeable {
 
     long period = refreshUnit.toMillis(refreshAmount);
     indexImporterTimer.schedule(this, period, period);
+    _inindexImporterTimer = indexImporterTimer;
     _table = _shardContext.getTableContext().getTable();
     _shard = _shardContext.getShard();
     _cleanupDelay = TimeUnit.MINUTES.toMillis(10);
@@ -94,6 +96,7 @@ public class IndexImporter extends TimerTask implements Closeable {
   @Override
   public void close() throws IOException {
     cancel();
+    _inindexImporterTimer.purge();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-query/src/main/java/org/apache/blur/lucene/search/PrimeDocCache.java
----------------------------------------------------------------------
diff --git a/blur-query/src/main/java/org/apache/blur/lucene/search/PrimeDocCache.java b/blur-query/src/main/java/org/apache/blur/lucene/search/PrimeDocCache.java
index 4faa954..9a1fb41 100644
--- a/blur-query/src/main/java/org/apache/blur/lucene/search/PrimeDocCache.java
+++ b/blur-query/src/main/java/org/apache/blur/lucene/search/PrimeDocCache.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.blur.index.AtomicReaderUtil;
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.memory.MemoryLeakDetector;
 import org.apache.lucene.index.AtomicReader;
 import org.apache.lucene.index.DocsEnum;
 import org.apache.lucene.index.Fields;
@@ -67,6 +68,7 @@ public class PrimeDocCache {
         });
         LOG.debug("Prime Doc BitSet missing for segment [" + reader + "] current size ["
+ primeDocMap.size() + "]");
         final OpenBitSet bs = new OpenBitSet(reader.maxDoc());
+        MemoryLeakDetector.record(bs, "PrimeDoc BitSet", key.toString());
 
         Fields fields = reader.fields();
         if (fields == null) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java b/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
index 8321dd2..d1f5253 100644
--- a/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
+++ b/blur-store/src/main/java/org/apache/blur/index/ExitableReader.java
@@ -284,6 +284,7 @@ public class ExitableReader extends FilterDirectoryReader {
   }
 
   private final ExitObject _exitObject;
+  private final DirectoryReader _in;
 
   public ExitableReader(DirectoryReader in) {
     this(in, new ExitObject());
@@ -292,6 +293,11 @@ public class ExitableReader extends FilterDirectoryReader {
   public ExitableReader(DirectoryReader in, ExitObject exitObject) {
     super(in, new ExitableSubReaderWrapper(exitObject));
     _exitObject = exitObject;
+    _in = in;
+  }
+
+  public DirectoryReader getIn() {
+    return _in;
   }
 
   public AtomicBoolean getRunning() {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
index 93e6fbb..8610c8d 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/FastHdfsKeyValueDirectory.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.blur.log.Log;
 import org.apache.blur.log.LogFactory;
+import org.apache.blur.memory.MemoryLeakDetector;
 import org.apache.blur.store.blockcache.LastModified;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -64,6 +65,7 @@ public class FastHdfsKeyValueDirectory extends Directory implements LastModified
   public FastHdfsKeyValueDirectory(Timer hdfsKeyValueTimer, Configuration configuration,
Path path) throws IOException {
     _path = path;
     _store = new HdfsKeyValueStore(hdfsKeyValueTimer, configuration, path);
+    MemoryLeakDetector.record(_store, "HdfsKeyValueStore", path.toString());
     BytesRef value = new BytesRef();
     if (_store.get(FILES, value)) {
       String filesString = value.utf8ToString();

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
----------------------------------------------------------------------
diff --git a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
index b9da75e..abbaae5 100644
--- a/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
+++ b/blur-store/src/main/java/org/apache/blur/store/hdfs_v2/HdfsKeyValueStore.java
@@ -161,6 +161,7 @@ public class HdfsKeyValueStore implements Store {
   private final TimerTask _idleLogTimerTask;
   private final TimerTask _oldFileCleanerTimerTask;
   private final AtomicLong _lastWrite = new AtomicLong();
+  private final Timer _hdfsKeyValueTimer;
 
   private FSDataOutputStream _output;
   private Path _outputPath;
@@ -188,14 +189,16 @@ public class HdfsKeyValueStore implements Store {
     cleanupOldFiles();
     _idleLogTimerTask = getIdleLogTimer();
     _oldFileCleanerTimerTask = getOldFileCleanerTimer();
-    hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
-    hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
-    Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE, path.getParent().toString()),
new Gauge<Long>() {
-      @Override
-      public Long value() {
-        return _size.get();
-      }
-    });
+    _hdfsKeyValueTimer = hdfsKeyValueTimer;
+    _hdfsKeyValueTimer.schedule(_idleLogTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+    _hdfsKeyValueTimer.schedule(_oldFileCleanerTimerTask, DAEMON_POLL_TIME, DAEMON_POLL_TIME);
+    // Metrics.newGauge(new MetricName(ORG_APACHE_BLUR, HDFS_KV, SIZE,
+    // path.getParent().toString()), new Gauge<Long>() {
+    // @Override
+    // public Long value() {
+    // return _size.get();
+    // }
+    // });
   }
 
   private void removeAnyTruncatedFiles() throws IOException {
@@ -467,6 +470,7 @@ public class HdfsKeyValueStore implements Store {
       _isClosed = true;
       _idleLogTimerTask.cancel();
       _oldFileCleanerTimerTask.cancel();
+      _hdfsKeyValueTimer.purge();
       _writeLock.lock();
       try {
         if (isOpenForWriting()) {

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/cc8e9806/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
new file mode 100644
index 0000000..6db6df8
--- /dev/null
+++ b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.memory;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+import com.google.common.collect.MapMaker;
+
+public class MemoryLeakDetector {
+
+  private static final Log LOG = LogFactory.getLog(MemoryLeakDetector.class);
+  private static boolean _enabled = true;
+  private static final ConcurrentMap<Object, Info> _map;
+  private static Timer _timer;
+
+  static class Info {
+    final Object[] _args;
+    final String _message;
+
+    Info(String message, Object[] args) {
+      _args = args;
+      _message = message;
+    }
+
+    @Override
+    public String toString() {
+      return "Info [_args=" + Arrays.toString(_args) + ", _message=" + _message + "]";
+    }
+
+  }
+
+  static {
+    _map = new MapMaker().weakKeys().makeMap();
+    _timer = new Timer("MemoryLeakDetector",true);
+    _timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        dump();
+      }
+    }, TimeUnit.SECONDS.toMillis(10), TimeUnit.SECONDS.toMillis(10));
+  }
+
+  public static <T> T record(T t, String message, Object... args) {
+    if (_enabled) {
+      String realMessage = MessageFormat.format(message, args);
+      Info info = new Info(realMessage, args);
+      _map.put(t, info);
+    }
+    return t;
+  }
+
+  public static void dump() {
+    Set<Entry<Object, Info>> entrySet = _map.entrySet();
+    for (Entry<Object, Info> e : entrySet) {
+      Object o = e.getKey();
+      if (o != null) {
+        Info info = e.getValue();
+        LOG.info("Object [{0}] Info [{1}]", o, info);
+      }
+    }
+  }
+
+}


Mime
View raw message