incubator-blur-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amccu...@apache.org
Subject git commit: Auto slab config is complete.
Date Thu, 28 Mar 2013 17:08:36 GMT
Updated Branches:
  refs/heads/0.1.5 2876f5478 -> 9dccdd9fc


Auto slab config is complete.


Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9dccdd9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9dccdd9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9dccdd9f

Branch: refs/heads/0.1.5
Commit: 9dccdd9fc18b2b941a95c8af976d1cb44ca75baa
Parents: 2876f54
Author: Aaron McCurry <amccurry@gmail.com>
Authored: Thu Mar 28 11:52:12 2013 -0400
Committer: Aaron McCurry <amccurry@gmail.com>
Committed: Thu Mar 28 11:52:12 2013 -0400

----------------------------------------------------------------------
 .../apache/blur/thrift/ThriftBlurShardServer.java  |   36 ++-
 src/blur-util/pom.xml                              |    5 +
 .../java/org/apache/blur/metrics/HDFSReporter.java |  310 +++++++++++++--
 .../src/main/resources/blur-default.properties     |    6 +-
 src/distribution/src/main/scripts/conf/blur-env.sh |    2 +-
 src/pom.xml                                        |    1 +
 6 files changed, 320 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9dccdd9f/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
----------------------------------------------------------------------
diff --git a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 0671234..ab958bf 100644
--- a/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/src/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -37,6 +37,9 @@ import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
 import static org.apache.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
 import static org.apache.blur.utils.BlurUtil.quietClose;
 
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
 import org.apache.blur.BlurConfiguration;
 import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler;
 import org.apache.blur.concurrent.ThreadWatcher;
@@ -88,7 +91,8 @@ public class ThriftBlurShardServer extends ThriftServer {
     // in a slab when using a block size of 8,192
     int numberOfBlocksPerSlab = 16384;
     int blockSize = BlockDirectory.BLOCK_SIZE;
-    int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, 1);
+    int slabCount = configuration.getInt(BLUR_SHARD_BLOCKCACHE_SLAB_COUNT, -1);
+    slabCount = getSlabCount(slabCount, numberOfBlocksPerSlab, blockSize);
     Cache cache;
     Configuration config = new Configuration();
     
@@ -224,6 +228,36 @@ public class ThriftBlurShardServer extends ThriftServer {
     new BlurServerShutDown().register(shutdown, zooKeeper);
     return server;
   }
+  
+  private static int getSlabCount(int slabCount, int numberOfBlocksPerSlab, int blockSize)
{
+    if (slabCount < 0) {
+      long slabSize = numberOfBlocksPerSlab * blockSize;
+      List<String> inputArguments = ManagementFactory.getRuntimeMXBean().getInputArguments();
+      for (String arg : inputArguments) {
+        if (arg.startsWith("-XX:MaxDirectMemorySize")) {
+          long maxDirectMemorySize = getMaxDirectMemorySize(arg);
+          maxDirectMemorySize -= 64 * 1024 * 1024;
+          return (int) (maxDirectMemorySize / slabSize);
+        }
+      }
+      throw new RuntimeException("Auto slab setup cannot happen, JVM option -XX:MaxDirectMemorySize
not set.");
+    }
+    return slabCount;
+  }
+  
+  private static long getMaxDirectMemorySize(String arg) {
+    int index = arg.lastIndexOf('=');
+    return parseNumber(arg.substring(index + 1).toLowerCase().replace(" ", ""));
+  }
+
+  private static long parseNumber(String number) {
+    if (number.endsWith("m")) {
+      return Long.parseLong(number.substring(0, number.length() - 1)) * 1024 * 1024;
+    } else if (number.endsWith("g")) {
+      return Long.parseLong(number.substring(0, number.length() - 1)) * 1024 * 1024 * 1024;
+    }
+    throw new RuntimeException("Cannot parse [" + number + "]");
+  }
 
   private static BlurFilterCache getFilterCache(BlurConfiguration configuration) {
     String _blurFilterCacheClass = configuration.get(BLUR_SHARD_FILTER_CACHE_CLASS);

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9dccdd9f/src/blur-util/pom.xml
----------------------------------------------------------------------
diff --git a/src/blur-util/pom.xml b/src/blur-util/pom.xml
index 710a458..7f4e27f 100644
--- a/src/blur-util/pom.xml
+++ b/src/blur-util/pom.xml
@@ -72,6 +72,11 @@
 			<scope>provided</scope>
 		</dependency>
 		<dependency>
+			<groupId>org.json</groupId>
+			<artifactId>json</artifactId>
+			<version>${json.version}</version>
+		</dependency>
+		<dependency>
 			<groupId>log4j</groupId>
 			<artifactId>log4j</artifactId>
 			<version>${log4j.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9dccdd9f/src/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java b/src/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
index 9031881..757febe 100644
--- a/src/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
+++ b/src/blur-util/src/main/java/org/apache/blur/metrics/HDFSReporter.java
@@ -1,73 +1,313 @@
 package org.apache.blur.metrics;
 
-import java.io.DataOutput;
-import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.util.SortedMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.json.JSONException;
+import org.json.JSONObject;
+
 import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Clock;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Metered;
+import com.yammer.metrics.core.Metric;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricProcessor;
 import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.reporting.AbstractPollingReporter;
-import com.yammer.metrics.reporting.CsvReporter;
+import com.yammer.metrics.stats.Snapshot;
 
-public class HDFSReporter extends AbstractPollingReporter implements
-MetricProcessor<DataOutput> {
-  
-  protected HDFSReporter(MetricsRegistry registry, String name) {
-    super(registry, name);
-  }
+public class HDFSReporter extends AbstractPollingReporter implements MetricProcessor<HDFSReporter.Context>
{
+
+  private static Log LOG = LogFactory.getLog(HDFSReporter.class);
+
+  static class Context {
+
+    private final Path path;
+    private final SimpleDateFormat formatter;
+    private final String name;
+    private final FileSystem fileSystem;
+    private String currentOutputFilePattern;
+    private long now;
+    private PrintWriter printStream;
+    private FSDataOutputStream outputStream;
+    private Path currentOutputPath;
+    private long maxTimeToKeep;
+
+    public Context(Path path, Configuration configuration, String filePattern, String name)
throws IOException {
+      this.path = path;
+      this.fileSystem = path.getFileSystem(configuration);
+      if (fileSystem.exists(path)) {
+        if (!fileSystem.getFileStatus(path).isDir()) {
+          throw new IOException("Path [" + path + "] is not a directory.");
+        }
+      } else {
+        fileSystem.mkdirs(path);
+      }
+      this.name = name;
+      this.formatter = new SimpleDateFormat(filePattern);
+      this.maxTimeToKeep = TimeUnit.MINUTES.toMillis(10);
+    }
+
+    public void open(long now) throws IOException {
+      this.now = now;
+      String outputFilePattern = formatter.format(new Date(now));
+      if (!outputFilePattern.equals(currentOutputFilePattern)) {
+        // roll file
+        rollFile(outputFilePattern);
+        cleanupOldMetrics();
+      }
+    }
+
+    private void cleanupOldMetrics() throws IOException {
+      FileStatus[] listStatus = fileSystem.listStatus(path);
+      for (FileStatus fileStatus : listStatus) {
+        Path filePath = fileStatus.getPath();
+        String fileName = filePath.getName();
+        if (fileName.startsWith(name + ".")) {
+          int sIndex = fileName.indexOf('.');
+          int eIndex = fileName.indexOf('.', sIndex + 1);
+          String pattern;
+          if (eIndex < 0) {
+            pattern = fileName.substring(sIndex + 1);
+          } else {
+            pattern = fileName.substring(sIndex + 1, eIndex);
+          }
+          Date date;
+          try {
+            date = formatter.parse(pattern);
+          } catch (ParseException e) {
+            throw new IOException(e);
+          }
+          if (date.getTime() + maxTimeToKeep < now) {
+            fileSystem.delete(filePath, false);
+          }
+        }
+      }
+    }
+
+    private void rollFile(String newOutputFilePattern) throws IOException {
+      if (printStream != null) {
+        printStream.close();
+      }
+      currentOutputPath = new Path(path, name + "." + newOutputFilePattern);
+      if (fileSystem.exists(currentOutputPath)) {
+        // try to append
+        try {
+          outputStream = fileSystem.append(currentOutputPath);
+        } catch (IOException e) {
+          currentOutputPath = new Path(path, name + "." + newOutputFilePattern + "." + now);
+          outputStream = fileSystem.create(currentOutputPath);
+        }
+      } else {
+        outputStream = fileSystem.create(currentOutputPath);
+      }
+      printStream = new PrintWriter(outputStream);
+      currentOutputFilePattern = newOutputFilePattern;
+    }
 
-  public static void main(String[] args) throws InterruptedException {
-    File file = new File("./target/metrics/");
-    file.mkdirs();
-    CsvReporter.enable(file, 1, TimeUnit.SECONDS);
-    Counter counter = Metrics.newCounter(HDFSReporter.class, "test");
-    while (true) {
-      counter.inc();
-      Thread.sleep(10);
+    public void write(JSONObject jsonObject) throws JSONException {
+      jsonObject.put("timestamp", now);
+      printStream.println(jsonObject.toString());
     }
+
+    public void flush() throws IOException {
+      printStream.flush();
+      outputStream.flush();
+      outputStream.sync();
+    }
+  }
+
+  public static void enable(Configuration configuration, Path path, String filePattern, String
name, long period,
+      TimeUnit unit) throws IOException {
+    enable(Metrics.defaultRegistry(), configuration, path, filePattern, name, period, unit);
+  }
+
+  public static void enable(MetricsRegistry metricsRegistry, Configuration configuration,
Path path,
+      String filePattern, String name, long period, TimeUnit unit) throws IOException {
+    final HDFSReporter reporter = new HDFSReporter(metricsRegistry, configuration, path,
filePattern, name);
+    reporter.start(period, unit);
+  }
+
+  private final Context context;
+  private final Clock clock;
+
+  public HDFSReporter(Configuration configuration, Path path, String filePattern, String
name) throws IOException {
+    this(Metrics.defaultRegistry(), configuration, path, filePattern, name);
+  }
+
+  public HDFSReporter(MetricsRegistry metricsRegistry, Configuration configuration, Path
path, String filePattern,
+      String name) throws IOException {
+    this(metricsRegistry, configuration, path, filePattern, name, Clock.defaultClock());
+  }
+
+  public HDFSReporter(MetricsRegistry metricsRegistry, Configuration configuration, Path
path, String filePattern,
+      String name, Clock clock) throws IOException {
+    super(metricsRegistry, "hdfs-reporter");
+    this.context = new Context(path, configuration, filePattern, name);
+    this.clock = clock;
   }
 
   @Override
-  public void processMeter(MetricName name, Metered meter, DataOutput context) throws Exception
{
-    // TODO Auto-generated method stub
-    
+  public void run() {
+    try {
+      context.open(clock.time());
+      for (Entry<String, SortedMap<MetricName, Metric>> entry : getMetricsRegistry().groupedMetrics().entrySet())
{
+        for (Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {
+          subEntry.getValue().processWith(this, subEntry.getKey(), context);
+        }
+      }
+      context.flush();
+    } catch (Throwable t) {
+      LOG.error("Unknown error during the processing of metrics.", t);
+    }
   }
 
   @Override
-  public void processCounter(MetricName name, Counter counter, DataOutput context) throws
Exception {
-    // TODO Auto-generated method stub
-    
+  public void processGauge(MetricName name, Gauge<?> gauge, HDFSReporter.Context context)
{
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "gauge");
+      jsonObject.put("value", gauge.value());
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
-  public void processHistogram(MetricName name, Histogram histogram, DataOutput context)
throws Exception {
-    // TODO Auto-generated method stub
-    
+  public void processCounter(MetricName name, Counter counter, HDFSReporter.Context context)
{
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "counter");
+      jsonObject.put("value", counter.count());
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
-  public void processTimer(MetricName name, Timer timer, DataOutput context) throws Exception
{
-    // TODO Auto-generated method stub
-    
+  public void processMeter(MetricName name, Metered meter, HDFSReporter.Context context)
{
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "meter");
+      JSONObject meterJsonObject = new JSONObject();
+
+      addMeterInfo(meter, meterJsonObject);
+
+      jsonObject.put("value", meterJsonObject);
+
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void addMeterInfo(Metered meter, JSONObject meterJsonObject) throws JSONException
{
+    meterJsonObject.put("rateUnit", meter.rateUnit());
+    meterJsonObject.put("eventType", meter.eventType());
+    meterJsonObject.put("count", meter.count());
+    meterJsonObject.put("meanRate", meter.meanRate());
+    meterJsonObject.put("oneMinuteRate", meter.oneMinuteRate());
+    meterJsonObject.put("fiveMinuteRate", meter.fiveMinuteRate());
+    meterJsonObject.put("fifteenMinuteRate", meter.fifteenMinuteRate());
   }
 
   @Override
-  public void processGauge(MetricName name, Gauge<?> gauge, DataOutput context) throws
Exception {
-    // TODO Auto-generated method stub
-    
+  public void processHistogram(MetricName name, Histogram histogram, HDFSReporter.Context
context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "meter");
+      JSONObject histogramJsonObject = new JSONObject();
+
+      histogramJsonObject.put("min", histogram.min());
+      histogramJsonObject.put("max", histogram.max());
+      histogramJsonObject.put("mean", histogram.mean());
+      histogramJsonObject.put("stdDev", histogram.stdDev());
+
+      Snapshot snapshot = histogram.getSnapshot();
+      JSONObject snapshotJsonObject = new JSONObject();
+      snapshotJsonObject.put("median", snapshot.getMedian());
+      snapshotJsonObject.put("75%", snapshot.get75thPercentile());
+      snapshotJsonObject.put("95%", snapshot.get95thPercentile());
+      snapshotJsonObject.put("98%", snapshot.get98thPercentile());
+      snapshotJsonObject.put("99%", snapshot.get99thPercentile());
+      snapshotJsonObject.put("99.9%", snapshot.get999thPercentile());
+
+      histogramJsonObject.put("snapshot", snapshotJsonObject);
+
+      jsonObject.put("value", histogramJsonObject);
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
-  public void run() {
-    // TODO Auto-generated method stub
-    
+  public void processTimer(MetricName name, Timer timer, HDFSReporter.Context context) {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("name", getName(name));
+      jsonObject.put("type", "meter");
+      JSONObject timerJsonObject = new JSONObject();
+
+      timerJsonObject.put("unit", timer.durationUnit());
+      timerJsonObject.put("min", timer.min());
+      timerJsonObject.put("max", timer.max());
+      timerJsonObject.put("mean", timer.mean());
+      timerJsonObject.put("stdDev", timer.stdDev());
+      addMeterInfo(timer, timerJsonObject);
+
+      Snapshot snapshot = timer.getSnapshot();
+      JSONObject snapshotJsonObject = new JSONObject();
+      snapshotJsonObject.put("median", snapshot.getMedian());
+      snapshotJsonObject.put("75%", snapshot.get75thPercentile());
+      snapshotJsonObject.put("95%", snapshot.get95thPercentile());
+      snapshotJsonObject.put("98%", snapshot.get98thPercentile());
+      snapshotJsonObject.put("99%", snapshot.get99thPercentile());
+      snapshotJsonObject.put("99.9%", snapshot.get999thPercentile());
+
+      timerJsonObject.put("snapshot", snapshotJsonObject);
+
+      jsonObject.put("value", timerJsonObject);
+
+      context.write(jsonObject);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
   }
 
+  private JSONObject getName(MetricName metricName) throws JSONException {
+    String group = metricName.getGroup();
+    String name = metricName.getName();
+    String scope = metricName.getScope();
+    String type = metricName.getType();
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put("name", name);
+    jsonObject.put("group", group);
+    jsonObject.put("scope", scope);
+    jsonObject.put("type", type);
+    return jsonObject;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9dccdd9f/src/blur-util/src/main/resources/blur-default.properties
----------------------------------------------------------------------
diff --git a/src/blur-util/src/main/resources/blur-default.properties b/src/blur-util/src/main/resources/blur-default.properties
index 5090005..58dfd56 100644
--- a/src/blur-util/src/main/resources/blur-default.properties
+++ b/src/blur-util/src/main/resources/blur-default.properties
@@ -24,9 +24,9 @@ blur.shard.cache.max.timetolive=60000
 blur.shard.filter.cache.class=org.apache.blur.manager.DefaultBlurFilterCache
 blur.shard.index.warmup.class=org.apache.blur.manager.indexserver.DefaultBlurIndexWarmup
 blur.shard.blockcache.direct.memory.allocation=true
-blur.shard.blockcache.slab.count=1
-blur.shard.buffercache.1024=8192
-blur.shard.buffercache.8192=8192
+blur.shard.blockcache.slab.count=-1
+blur.shard.buffercache.1024=1024
+blur.shard.buffercache.8192=1024
 blur.shard.safemodedelay=30000
 blur.shard.time.between.commits=30000
 blur.shard.time.between.refreshs=3000

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9dccdd9f/src/distribution/src/main/scripts/conf/blur-env.sh
----------------------------------------------------------------------
diff --git a/src/distribution/src/main/scripts/conf/blur-env.sh b/src/distribution/src/main/scripts/conf/blur-env.sh
index 4872b15..346cf3e 100755
--- a/src/distribution/src/main/scripts/conf/blur-env.sh
+++ b/src/distribution/src/main/scripts/conf/blur-env.sh
@@ -28,7 +28,7 @@
 #-XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode -XX:CMSIncrementalDutyCycleMin=10 -XX:CMSIncrementalDutyCycle=50
\
 #-XX:ParallelGCThreads=8 -XX:+UseParNewGC -XX:MaxGCPauseMillis=200 -XX:GCTimeRatio=10 -XX:+DisableExplicitGC
\
 #-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:$BLUR_HOME/logs/gc-blur-shard-server_`date
+%Y%m%d_%H%M%S`.log
-export BLUR_SHARD_JVM_OPTIONS="-Xmx1024m -Djava.net.preferIPv4Stack=true"
+export BLUR_SHARD_JVM_OPTIONS="-Xmx1024m -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=256m"
 
 # JAVA JVM OPTIONS for the shard servers, jvm tuning parameters are placed here.
 export BLUR_CONTROLLER_JVM_OPTIONS="-Xmx1024m -Djava.net.preferIPv4Stack=true"

http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9dccdd9f/src/pom.xml
----------------------------------------------------------------------
diff --git a/src/pom.xml b/src/pom.xml
index d0d4581..eb01345 100644
--- a/src/pom.xml
+++ b/src/pom.xml
@@ -51,6 +51,7 @@ under the License.
 		<jline.version>2.7</jline.version>
 		<guava.version>14.0</guava.version>
 		<metrics-core.version>2.2.0</metrics-core.version>
+		<json.version>20090211</json.version>
 	</properties>
 
 	<modules>


Mime
View raw message