incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject [2/2] git commit: The server side of the back pressure feature is complete.
Date Mon, 01 Jul 2013 01:44:12 GMT
The server side of the back pressure feature is complete.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/e0b3d74b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/e0b3d74b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/e0b3d74b

Branch: refs/heads/master
Commit: e0b3d74ba361b710440dfde49066d257aa4657a4
Parents: fcdafdb
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Sun Jun 30 21:43:32 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Sun Jun 30 21:43:32 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/manager/status/QueryStatus.java | 16 +++---
 .../blur/manager/status/QueryStatusManager.java |  3 +-
 .../java/org/apache/blur/utils/GCWatcher.java   | 20 +++++--
 .../org/apache/blur/thrift/BlurClusterTest.java | 59 ++++++++++++++++++++
 4 files changed, 85 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e0b3d74b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
index 08c2b0b..0c504ef 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatus.java
@@ -30,13 +30,14 @@ import org.apache.blur.thrift.generated.CpuTime;
 import org.apache.blur.thrift.generated.QueryState;
 
 /**
- * This class is accessed by multiple threads (one for each shard) 
- * executing the query. Tracks status and collects metrics
- *
+ * This class is accessed by multiple threads (one for each shard) executing the
+ * query. Tracks status and collects metrics
+ * 
  */
 public class QueryStatus implements Comparable<QueryStatus> {
 
-  private final static boolean CPU_TIME_SUPPORTED = ManagementFactory.getThreadMXBean().isCurrentThreadCpuTimeSupported();
+  private final static boolean CPU_TIME_SUPPORTED = ManagementFactory.getThreadMXBean()
+      .isCurrentThreadCpuTimeSupported();
 
   private final BlurQuery _blurQuery;
   private final String _table;
@@ -57,6 +58,7 @@ public class QueryStatus implements Comparable<QueryStatus> {
     _blurQuery = blurQuery;
     _startingTime = System.currentTimeMillis();
     _running = running;
+    _state.set(QueryState.RUNNING);
   }
 
   public QueryStatus attachThread(String shardName) {
@@ -74,9 +76,9 @@ public class QueryStatus implements Comparable<QueryStatus> {
 
   public QueryStatus deattachThread(String shardName) {
     _completeShards.incrementAndGet();
-     CpuTime cpuTime = _cpuTimes.get(shardName);
+    CpuTime cpuTime = _cpuTimes.get(shardName);
     if (CPU_TIME_SUPPORTED) {
-    	cpuTime.cpuTime = _bean.getCurrentThreadCpuTime() - cpuTime.cpuTime;
+      cpuTime.cpuTime = _bean.getCurrentThreadCpuTime() - cpuTime.cpuTime;
     }
     cpuTime.realTime = System.nanoTime() - cpuTime.realTime;
     return this;
@@ -85,7 +87,7 @@ public class QueryStatus implements Comparable<QueryStatus> {
   public long getUserUuid() {
     return _blurQuery.uuid;
   }
-  
+
   public void stopQueryForBackPressure() {
     _state.set(QueryState.BACK_PRESSURE_INTERRUPTED);
     _running.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e0b3d74b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
index 5abc1c3..438e264 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/status/QueryStatusManager.java
@@ -139,7 +139,8 @@ public class QueryStatusManager {
   public void stopAllQueriesForBackPressure() {
     LOG.warn("Stopping all queries for back pressure.");
     for (QueryStatus status : currentQueryStatusCollection.keySet()) {
-      if (status.getQueryStatus().getState() == QueryState.RUNNING) {
+      QueryState state = status.getQueryStatus().getState();
+      if (state == QueryState.RUNNING) {
         status.stopQueryForBackPressure();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e0b3d74b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
----------------------------------------------------------------------
diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
index 3837a65..6fbdc89 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
@@ -18,6 +18,7 @@ package org.apache.blur.utils;
  */
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
@@ -45,7 +46,8 @@ public class GCWatcher extends TimerTask {
   private Method _method;
   private GcInfo _gcInfo;
   private long _lastIndex;
-  private List<Action> _actions = new ArrayList<Action>();
+  private final List<Action> _actions = new ArrayList<Action>();
+  private final MemoryMXBean _memoryMXBean;
   private static GCWatcher _instance;
 
   public interface Action {
@@ -53,6 +55,7 @@ public class GCWatcher extends TimerTask {
   }
 
   private GCWatcher(double ratio) {
+    _memoryMXBean = ManagementFactory.getMemoryMXBean();
     List<GarbageCollectorMXBean> garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
     for (GarbageCollectorMXBean bean : garbageCollectorMXBeans) {
       String name = bean.getName();
@@ -113,13 +116,20 @@ public class GCWatcher extends TimerTask {
         }
         long startTime = _gcInfo.getStartTime();
         long endTime = _gcInfo.getEndTime();
+        Map<String, MemoryUsage> usageBeforeGc = _gcInfo.getUsageBeforeGc();
         Map<String, MemoryUsage> usageAfterGc = _gcInfo.getUsageAfterGc();
+        
+        MemoryUsage before = usageBeforeGc.get(CMS_OLD_GEN);
+        long usedBefore = before.getUsed();
+        
         MemoryUsage after = usageAfterGc.get(CMS_OLD_GEN);
-        long used = after.getUsed();
-        long max = after.getMax();
-
+        long usedAfter = after.getUsed();
+        
         long totalTime = endTime - startTime;
-        LOG.info("totalTime spent in GC [{0}] collected [{1}]", totalTime, (max - used));
+        LOG.info("totalTime spent in GC [{0}] collected [{1}]", totalTime, (usedBefore -
usedAfter));
+        MemoryUsage heapMemoryUsage = _memoryMXBean.getHeapMemoryUsage();
+        long max = heapMemoryUsage.getMax();
+        long used = heapMemoryUsage.getUsed();
         long upperLimit = (long) (max * _ratio);
         if (used > upperLimit) {
           LOG.error("----- WARNING !!!! - Heap used [{0}] over limit of [{1}], taking action
to avoid an OOM error.", used,

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/e0b3d74b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
----------------------------------------------------------------------
diff --git a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
index f1be626..135601b 100644
--- a/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/thrift/BlurClusterTest.java
@@ -25,6 +25,9 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -48,6 +51,7 @@ import org.apache.blur.thrift.generated.RowMutation;
 import org.apache.blur.thrift.generated.SimpleQuery;
 import org.apache.blur.thrift.generated.TableDescriptor;
 import org.apache.blur.thrift.util.BlurThriftHelper;
+import org.apache.blur.utils.GCWatcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -61,10 +65,12 @@ import org.junit.Test;
 
 public class BlurClusterTest {
 
+  private static final long _20MB = 20 * 1000 * 1000;
   private static final File TMPDIR = new File(System.getProperty("blur.tmp.dir", "./target/tmp_BlurClusterTest"));
 
   @BeforeClass
   public static void startCluster() throws IOException {
+    GCWatcher.init(0.80);
     LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
     File testDirectory = new File(TMPDIR, "blur-cluster-test");
     testDirectory.mkdirs();
@@ -186,6 +192,59 @@ public class BlurClusterTest {
     assertEquals(blurException.getErrorType(), ErrorType.QUERY_CANCEL);
   }
 
+  @Test
+  public void testBackPressure() throws BlurException, TException, InterruptedException {
+    // This will make each collect in the collectors pause 250 ms per collect
+    // call
+    IndexManager.DEBUG_RUN_SLOW.set(true);
+
+    final Iface client = getClient();
+    final BlurQuery blurQueryRow = new BlurQuery();
+    SimpleQuery simpleQueryRow = new SimpleQuery();
+    simpleQueryRow.setQueryStr("test.test:value");
+    blurQueryRow.setSimpleQuery(simpleQueryRow);
+    blurQueryRow.setUseCacheIfPresent(false);
+    blurQueryRow.setCacheResult(false);
+    blurQueryRow.setUuid(1234l);
+
+    final AtomicReference<BlurException> error = new AtomicReference<BlurException>();
+    final AtomicBoolean fail = new AtomicBoolean();
+
+    System.gc();
+    System.gc();
+    MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage usage = memoryMXBean.getHeapMemoryUsage();
+    long max = usage.getMax();
+    long used = usage.getUsed();
+    long limit = (long) (max * 0.80);
+    long difference = limit - used;
+    byte[] bufferToFillHeap = new byte[(int) (difference - _20MB)];
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // This call will take several seconds to execute.
+          client.query("test", blurQueryRow);
+          fail.set(true);
+        } catch (BlurException e) {
+          error.set(e);
+        } catch (TException e) {
+          e.printStackTrace();
+          fail.set(true);
+        }
+      }
+    }).start();
+    Thread.sleep(500);
+    byte[] bufferToPutGcWatcherOverLimit = new byte[(int) _20MB];
+    BlurException blurException = pollForError(error, 10, TimeUnit.SECONDS);
+    if (fail.get()) {
+      fail("Unknown error, failing test.");
+    }
+    System.out.println(bufferToFillHeap.hashCode());
+    System.out.println(bufferToPutGcWatcherOverLimit.hashCode());
+    assertEquals(blurException.getErrorType(), ErrorType.BACK_PRESSURE);
+  }
+
   private BlurException pollForError(AtomicReference<BlurException> error, long period,
TimeUnit timeUnit)
       throws InterruptedException {
     long s = System.nanoTime();


Mime
View raw message