incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [1/2] git commit: Adding the ability to trace allocations during search in a test env. This adds no runtime libs and it's coded to be disabled during runtime. A special minicluster test will be written to take advantage of this feature to report on memo
Date Sun, 08 Mar 2015 15:25:44 GMT
Repository: incubator-blur
Updated Branches:
  refs/heads/master f4ac13f9a -> 2ca85fa2b


Adding the ability to trace allocations during search in a test env. This adds no runtime
libs and it's coded to be disabled during runtime.  A special minicluster test will be written
to take advantage of this feature to report on memory allocationg during searching.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/c58742e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/c58742e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/c58742e5

Branch: refs/heads/master
Commit: c58742e5e8f4e261a64617c0e1350eed3330f873
Parents: 55a5d1a
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Mar 8 11:23:05 2015 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Mar 8 11:25:27 2015 -0400

----------------------------------------------------------------------
 blur-core/pom.xml                               |   5 +
 .../org/apache/blur/manager/IndexManager.java   |  30 +++--
 .../blur/memory/MemoryAllocationWatcher.java    |  23 ++++
 .../java/org/apache/blur/memory/Watcher.java    |  23 ++++
 .../blur/thrift/ThriftBlurShardServer.java      |  11 +-
 .../apache/blur/manager/IndexManagerTest.java   |  10 +-
 .../apache/blur/memory/AllocationWatcher.java   | 115 +++++++++++++++++++
 .../apache/blur/memory/MemoryLeakDetector.java  |   3 +-
 pom.xml                                         |  10 +-
 9 files changed, 217 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/pom.xml
----------------------------------------------------------------------
diff --git a/blur-core/pom.xml b/blur-core/pom.xml
index c017572..090bed6 100644
--- a/blur-core/pom.xml
+++ b/blur-core/pom.xml
@@ -98,6 +98,11 @@ under the License.
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-webapp</artifactId>
         </dependency>
+		<dependency>
+			<groupId>com.google.code.java-allocation-instrumenter</groupId>
+			<artifactId>java-allocation-instrumenter</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<repositories>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
index 56b00b0..d42218d 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/IndexManager.java
@@ -68,6 +68,8 @@ import org.apache.blur.manager.status.QueryStatus;
 import org.apache.blur.manager.status.QueryStatusManager;
 import org.apache.blur.manager.writer.BlurIndex;
 import org.apache.blur.manager.writer.MutatableAction;
+import org.apache.blur.memory.MemoryAllocationWatcher;
+import org.apache.blur.memory.Watcher;
 import org.apache.blur.server.ShardContext;
 import org.apache.blur.server.ShardServerContext;
 import org.apache.blur.server.TableContext;
@@ -177,12 +179,14 @@ public class IndexManager {
   private final int _threadCount;
   private final int _mutateThreadCount;
   private final DeepPagingCache _deepPagingCache;
+  private final MemoryAllocationWatcher _memoryAllocationWatcher;
 
   public static AtomicBoolean DEBUG_RUN_SLOW = new AtomicBoolean(false);
 
   public IndexManager(IndexServer indexServer, ClusterStatus clusterStatus, BlurFilterCache
filterCache,
       int maxHeapPerRowFetch, int fetchCount, int threadCount, int mutateThreadCount, long
statusCleanupTimerDelay,
-      int facetThreadCount, DeepPagingCache deepPagingCache) {
+      int facetThreadCount, DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher)
{
+    _memoryAllocationWatcher = memoryAllocationWatcher;
     _deepPagingCache = deepPagingCache;
     _indexServer = indexServer;
     _clusterStatus = clusterStatus;
@@ -529,7 +533,7 @@ public class IndexManager {
       Sort sort = getSort(blurQuery, fieldManager);
       call = new SimpleQueryParallelCall(running, table, status, facetedQuery, blurQuery.selector,
           _queriesInternalMeter, shardServerContext, runSlow, _fetchCount, _maxHeapPerRowFetch,
-          context.getSimilarity(), context, sort, _deepPagingCache);
+          context.getSimilarity(), context, sort, _deepPagingCache, _memoryAllocationWatcher);
       trace.done();
       MergerBlurResultIterable merger = new MergerBlurResultIterable(blurQuery);
       BlurResultIterable merge = ForkJoin.execute(_executor, blurIndexes.entrySet(), call,
new Cancel() {
@@ -1191,11 +1195,12 @@ public class IndexManager {
     private final TableContext _context;
     private final Sort _sort;
     private final DeepPagingCache _deepPagingCache;
+    private final MemoryAllocationWatcher _memoryAllocationWatcher;
 
     public SimpleQueryParallelCall(AtomicBoolean running, String table, QueryStatus status,
Query query,
         Selector selector, Meter queriesInternalMeter, ShardServerContext shardServerContext,
boolean runSlow,
         int fetchCount, int maxHeapPerRowFetch, Similarity similarity, TableContext context,
Sort sort,
-        DeepPagingCache deepPagingCache) {
+        DeepPagingCache deepPagingCache, MemoryAllocationWatcher memoryAllocationWatcher)
{
       _running = running;
       _table = table;
       _status = status;
@@ -1210,14 +1215,15 @@ public class IndexManager {
       _context = context;
       _sort = sort;
       _deepPagingCache = deepPagingCache;
+      _memoryAllocationWatcher = memoryAllocationWatcher;
     }
 
     @Override
     public BlurResultIterable call(Entry<String, BlurIndex> entry) throws Exception
{
-      String shard = entry.getKey();
+      final String shard = entry.getKey();
       _status.attachThread(shard);
       BlurIndex index = entry.getValue();
-      IndexSearcherCloseable searcher = index.getIndexSearcher();
+      final IndexSearcherCloseable searcher = index.getIndexSearcher();
       Tracer trace2 = null;
       try {
         IndexReader indexReader = searcher.getIndexReader();
@@ -1229,7 +1235,7 @@ public class IndexManager {
         }
         searcher.setSimilarity(_similarity);
         Tracer trace1 = Trace.trace("query rewrite", Trace.param("table", _table));
-        Query rewrite;
+        final Query rewrite;
         try {
           rewrite = searcher.rewrite((Query) _query.clone());
         } catch (ExitingReaderException e) {
@@ -1242,8 +1248,16 @@ public class IndexManager {
         // BlurResultIterableSearcher will close searcher, if shard server
         // context is null.
         trace2 = Trace.trace("query initial search");
-        return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher,
_selector,
-            _shardServerContext == null, _runSlow, _fetchCount, _maxHeapPerRowFetch, _context,
_sort, _deepPagingCache);
+        BlurResultIterableSearcher iterableSearcher = _memoryAllocationWatcher
+            .run(new Watcher<BlurResultIterableSearcher, BlurException>() {
+              @Override
+              public BlurResultIterableSearcher run() throws BlurException {
+                return new BlurResultIterableSearcher(_running, rewrite, _table, shard, searcher,
_selector,
+                    _shardServerContext == null, _runSlow, _fetchCount, _maxHeapPerRowFetch,
_context, _sort,
+                    _deepPagingCache);
+              }
+            });
+        return iterableSearcher;
       } catch (BlurException e) {
         switch (_status.getQueryStatus().getState()) {
         case INTERRUPTED:

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/src/main/java/org/apache/blur/memory/MemoryAllocationWatcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/memory/MemoryAllocationWatcher.java b/blur-core/src/main/java/org/apache/blur/memory/MemoryAllocationWatcher.java
new file mode 100644
index 0000000..3899c3d
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/memory/MemoryAllocationWatcher.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.memory;
+
+public interface MemoryAllocationWatcher {
+
+  <T, E extends Exception> T run(Watcher<T, E> w) throws E;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/src/main/java/org/apache/blur/memory/Watcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/memory/Watcher.java b/blur-core/src/main/java/org/apache/blur/memory/Watcher.java
new file mode 100644
index 0000000..649e84a
--- /dev/null
+++ b/blur-core/src/main/java/org/apache/blur/memory/Watcher.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.memory;
+
+public interface Watcher<T, E extends Exception> {
+
+  T run() throws E;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 5cdbe77..ddbd1d1 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -86,6 +86,8 @@ import org.apache.blur.manager.indexserver.BlurServerShutDown.BlurShutdown;
 import org.apache.blur.manager.indexserver.DistributedIndexServer;
 import org.apache.blur.manager.indexserver.DistributedLayoutFactory;
 import org.apache.blur.manager.indexserver.DistributedLayoutFactoryImpl;
+import org.apache.blur.memory.MemoryAllocationWatcher;
+import org.apache.blur.memory.Watcher;
 import org.apache.blur.metrics.JSONReporter;
 import org.apache.blur.metrics.ReporterSetup;
 import org.apache.blur.server.ServerSecurityFilter;
@@ -242,9 +244,16 @@ public class ThriftBlurShardServer extends ThriftServer {
     int cacheSize = configuration.getInt(BLUR_SHARD_DEEP_PAGING_CACHE_SIZE, 1000);
     DeepPagingCache deepPagingCache = new DeepPagingCache(cacheSize);
 
+    MemoryAllocationWatcher memoryAllocationWatcher = new MemoryAllocationWatcher() {
+      @Override
+      public <T, E extends Exception> T run(Watcher<T, E> w) throws E {
+        return w.run();
+      }
+    };
+
     final IndexManager indexManager = new IndexManager(indexServer, clusterStatus, filterCache,
maxHeapPerRowFetch,
         fetchCount, indexManagerThreadCount, mutateThreadCount, statusCleanupTimerDelay,
facetThreadCount,
-        deepPagingCache);
+        deepPagingCache, memoryAllocationWatcher);
 
     File tmpPath = getTmpPath(configuration);
     int numberOfShardWorkerCommandThreads = configuration.getInt(BLUR_SHARD_COMMAND_WORKER_THREADS,
16);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
index 78fe48e..d51f8cd 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/IndexManagerTest.java
@@ -51,6 +51,8 @@ import org.apache.blur.lucene.search.DeepPagingCache;
 import org.apache.blur.manager.clusterstatus.ClusterStatus;
 import org.apache.blur.manager.indexserver.LocalIndexServer;
 import org.apache.blur.manager.results.BlurResultIterable;
+import org.apache.blur.memory.MemoryAllocationWatcher;
+import org.apache.blur.memory.Watcher;
 import org.apache.blur.server.TableContext;
 import org.apache.blur.thrift.generated.BlurException;
 import org.apache.blur.thrift.generated.BlurQuery;
@@ -94,6 +96,12 @@ public class IndexManagerTest {
   private static final String TABLE = "table";
   private static final String FAMILY = "test-family";
   private static final String FAMILY2 = "test-family2";
+  private static final MemoryAllocationWatcher NOTHING = new MemoryAllocationWatcher() {
+    @Override
+    public <T, E extends Exception> T run(Watcher<T, E> w) throws E {
+      return w.run();
+    }
+  };
   private LocalIndexServer server;
   private IndexManager indexManager;
   private File base;
@@ -121,7 +129,7 @@ public class IndexManagerTest {
     BlurFilterCache filterCache = new DefaultBlurFilterCache(new BlurConfiguration());
     long statusCleanupTimerDelay = 1000;
     indexManager = new IndexManager(server, getClusterStatus(tableDescriptor), filterCache,
10000000, 100, 1, 1,
-        statusCleanupTimerDelay, 0, new DeepPagingCache());
+        statusCleanupTimerDelay, 0, new DeepPagingCache(), NOTHING);
     setupData();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-core/src/test/java/org/apache/blur/memory/AllocationWatcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/memory/AllocationWatcher.java b/blur-core/src/test/java/org/apache/blur/memory/AllocationWatcher.java
new file mode 100644
index 0000000..c42fd5a
--- /dev/null
+++ b/blur-core/src/test/java/org/apache/blur/memory/AllocationWatcher.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.blur.memory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.blur.log.Log;
+import org.apache.blur.log.LogFactory;
+
+import com.google.monitoring.runtime.instrumentation.Sampler;
+
+public class AllocationWatcher implements MemoryAllocationWatcher, Sampler {
+
+  private static final Log LOG = LogFactory.getLog(AllocationWatcher.class);
+
+  public <T, E extends Exception> T run(Watcher<T, E> w) throws E {
+    Thread t = Thread.currentThread();
+    Info info = _threadMap.get(t);
+    if (info == null) {
+      info = new Info();
+      info._enabled = true;
+      _threadMap.put(t, info);
+    }
+    return w.run();
+  }
+
+  static class Info {
+    boolean _enabled;
+    Map<String, Long> _count = new HashMap<String, Long>();
+    Map<String, Long> _sizes = new HashMap<String, Long>();
+    Map<String, Map<StackTraceElement, Long>> _stackElements = new HashMap<String,
Map<StackTraceElement, Long>>();
+  }
+
+  private final Map<Thread, Info> _threadMap = new ConcurrentHashMap<Thread, Info>();
+
+  @Override
+  public void sampleAllocation(int count, String desc, Object newObj, long size) {
+    Info info = _threadMap.get(Thread.currentThread());
+    if (info != null && info._enabled) {
+      StackTraceElement[] stackTrace = new Throwable().getStackTrace();
+      StackTraceElement stackTraceElement = stackTrace[2];
+      add(desc, info._count, 1L);
+      add(desc, info._sizes, size);
+      add(desc, info._stackElements, stackTraceElement);
+    }
+  }
+
+  private void add(String desc, Map<String, Map<StackTraceElement, Long>> stackElements,
+      StackTraceElement stackTraceElement) {
+    Map<StackTraceElement, Long> map = stackElements.get(desc);
+    if (map == null) {
+      stackElements.put(desc, map = new HashMap<StackTraceElement, Long>());
+    }
+    add(stackTraceElement, map, 1L);
+  }
+
+  private <K> void add(K key, Map<K, Long> countMap, Long amount) {
+    Long c = countMap.get(key);
+    if (c == null) {
+      countMap.put(key, amount);
+    } else {
+      countMap.put(key, c + amount);
+    }
+  }
+
+  public void reset() {
+    _threadMap.clear();
+  }
+
+  public void dump() {
+    Map<Thread, Info> threadMap = _threadMap;
+    for (Entry<Thread, Info> entry : threadMap.entrySet()) {
+      Thread thread = entry.getKey();
+      Info info = entry.getValue();
+      Map<String, Long> map = info._count;
+      List<Entry<String, Long>> elements = new ArrayList<Map.Entry<String,
Long>>(map.entrySet());
+      Collections.sort(elements, new Comparator<Entry<String, Long>>() {
+        @Override
+        public int compare(Entry<String, Long> o1, Entry<String, Long> o2) {
+          return o1.getValue().compareTo(o2.getValue());
+        }
+      });
+      for (Entry<String, Long> e : elements) {
+        String desc = e.getKey();
+        LOG.info(thread.getName() + " " + desc + "=>" + e.getValue() + " [" + info._sizes.get(desc)
+ "]");
+        Map<StackTraceElement, Long> stackMap = info._stackElements.get(desc);
+        for (Entry<StackTraceElement, Long> stackEntry : stackMap.entrySet()) {
+          LOG.info("\t" + stackEntry.getKey() + " " + stackEntry.getValue());
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
----------------------------------------------------------------------
diff --git a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
index 6db6df8..d5b1066 100644
--- a/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
+++ b/blur-util/src/main/java/org/apache/blur/memory/MemoryLeakDetector.java
@@ -33,7 +33,8 @@ import com.google.common.collect.MapMaker;
 public class MemoryLeakDetector {
 
   private static final Log LOG = LogFactory.getLog(MemoryLeakDetector.class);
-  private static boolean _enabled = true;
+  
+  private static boolean _enabled = false;
   private static final ConcurrentMap<Object, Info> _map;
   private static Timer _timer;
 

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/c58742e5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index edd7fc7..af3172f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,8 +172,8 @@ under the License.
 	</scm>
 
 	<properties>
-                <commons-logging.version>1.1.3</commons-logging.version>
-                <commons-lang.version>2.4</commons-lang.version>
+		<commons-logging.version>1.1.3</commons-logging.version>
+		<commons-lang.version>2.4</commons-lang.version>
 		<zookeeper.version>3.4.5</zookeeper.version>
 		<log4j.version>1.2.15</log4j.version>
 		<jersey.version>1.14</jersey.version>
@@ -193,6 +193,7 @@ under the License.
 		<httpclient.version>4.1.3</httpclient.version>
 		<servlet-api.version>3.0.1</servlet-api.version>
 		<hive.version>0.13.1</hive.version>
+		<java-allocation-instrumenter.version>3.0</java-allocation-instrumenter.version>
 	</properties>
 
     <dependencyManagement>
@@ -232,6 +233,11 @@ under the License.
                 <artifactId>hadoop-minicluster</artifactId>
                 <version>${hadoop.version}</version>
             </dependency>
+			<dependency>
+				<groupId>com.google.code.java-allocation-instrumenter</groupId>
+				<artifactId>java-allocation-instrumenter</artifactId>
+				<version>${java-allocation-instrumenter.version}</version>
+			</dependency>
             <dependency>
                 <groupId>log4j</groupId>
                 <artifactId>log4j</artifactId>


Mime
View raw message