chukwa-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject [21/22] chukwa git commit: CHUKWA-767. Implemented low pass filter for Charting REST API. (Eric Yang)
Date Thu, 25 Jun 2015 20:48:52 GMT
CHUKWA-767. Implemented low pass filter for Charting REST API.  (Eric Yang)


Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo
Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/f3dfc942
Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/f3dfc942
Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/f3dfc942

Branch: refs/heads/master
Commit: f3dfc942025d4f476815df85091d35559e13f72a
Parents: 068b454
Author: Eric Yang <eyang@apache.org>
Authored: Wed Jun 24 14:46:11 2015 -0700
Committer: Eric Yang <eyang@apache.org>
Committed: Wed Jun 24 14:46:11 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../chukwa/datastore/ChukwaHBaseStore.java      | 76 ++++++++++++++++----
 2 files changed, 63 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/chukwa/blob/f3dfc942/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ea4d46..633a7c9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    CHUKWA-767. Implemented low pass filter for Charting REST API.  (Eric Yang)
+
     CHUKWA-765. Minor stylesheets clean up.  (Eric Yang)
 
     CHUKWA-759. Configuration for Chukwa to monitor HBase.  (Eric Yang)

http://git-wip-us.apache.org/repos/asf/chukwa/blob/f3dfc942/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
index cade66a..2bef452 100644
--- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
+++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java
@@ -63,11 +63,17 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.log4j.Logger;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
+import org.mortbay.log.Log;
 
 import com.google.gson.Gson;
 
 public class ChukwaHBaseStore {
   static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class);
+  static int MINUTES_IN_HOUR = 60;
+  static double RESOLUTION = 360;
+  static int MINUTE = 60000; //60 milliseconds
+  static int SECOND = 1000;
+
   static byte[] COLUMN_FAMILY = "t".getBytes();
   static byte[] ANNOTATION_FAMILY = "a".getBytes();
   static byte[] KEY_NAMES = "k".getBytes();
@@ -139,6 +145,7 @@ public class ChukwaHBaseStore {
         startTime = endTime;
         endTime = temp;
       }
+
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA));
       Scan scan = new Scan();
@@ -150,10 +157,7 @@ public class ChukwaHBaseStore {
       long currentDay = startTime;
       for (int i = startDay; i <= endDay; i++) {
         byte[] rowKey = HBaseUtil.buildKey(currentDay, metric, source);
-        // ColumnRangeFilter crf = new
-        // ColumnRangeFilter(Long.valueOf(startTime).toString().getBytes(),
-        // true, Long.valueOf(endTime).toString().getBytes(), true);
-        // scan.setFilter(crf);
+
         scan.addFamily(COLUMN_FAMILY);
         scan.setStartRow(rowKey);
         scan.setStopRow(rowKey);
@@ -162,8 +166,7 @@ public class ChukwaHBaseStore {
 
         ResultScanner results = table.getScanner(scan);
         Iterator<Result> it = results.iterator();
-        // TODO: Apply discrete wavelet transformation to limit the output
-        // size to 1000 data points for graphing optimization. (i.e jwave)
+
         while (it.hasNext()) {
           Result result = it.next();
           for (KeyValue kv : result.raw()) {
@@ -497,6 +500,23 @@ public class ChukwaHBaseStore {
         startTime = endTime;
         endTime = temp;
       }
+      // Figure out the time range and determine the best resolution
+      // to fetch the data
+      long range = Math.round((endTime - startTime)
+        / (MINUTES_IN_HOUR * MINUTE));
+      long sampleRate = 1;
+      if (range <= 1) {
+        sampleRate = 5;
+      } else if(range <= 24) {
+        sampleRate = 240;
+      } else if (range <= 720) {
+        sampleRate = 7200;
+      } else if(range >= 720) {
+        sampleRate = 87600;
+      }
+      double smoothing = (endTime - startTime)
+          / (sampleRate * SECOND ) / RESOLUTION;
+
       getHBaseConnection();
       Table table = connection.getTable(TableName.valueOf(CHUKWA));
       Scan scan = new Scan();
@@ -522,8 +542,11 @@ public class ChukwaHBaseStore {
 
           ResultScanner results = table.getScanner(scan);
           Iterator<Result> it = results.iterator();
-          // TODO: Apply discrete wavelet transformation to limit the output
-          // size to 1000 data points for graphing optimization. (i.e jwave)
+          double filteredValue = 0.0d;
+          long lastTime = startTime;
+          long totalElapsedTime = 0;
+          int initial = 0;
+          
           while (it.hasNext()) {
             Result result = it.next();
             for (KeyValue kv : result.raw()) {
@@ -531,11 +554,34 @@ public class ChukwaHBaseStore {
               long timestamp = ByteBuffer.wrap(key).getLong();
               double value = Double.parseDouble(new String(kv.getValue(),
                   "UTF-8"));
-              ArrayList<Number> points = new ArrayList<Number>();
-              points.add(timestamp);
-              points.add(value);
-              data.add(points);
+              if(initial==0) {
+                filteredValue = value;
+              }
+              long elapsedTime = (timestamp - lastTime) / SECOND;
+              lastTime = timestamp;
+              // Determine if there is any gap, if there is gap in data, reset
+              // calculation.
+              if (elapsedTime > sampleRate) {
+                filteredValue = 0.0d;
+              } else {
+                if (smoothing != 0.0d) {
+                  // Apply low pass filter to calculate
+                  filteredValue = filteredValue + (double) ((double) elapsedTime * (double)
((double) (value - filteredValue) / smoothing));
+                } else {
+                  // Use original value
+                  filteredValue = value;
+                }
+              }
+              totalElapsedTime = totalElapsedTime + elapsedTime;
+              if (totalElapsedTime >= sampleRate) {
+                ArrayList<Number> points = new ArrayList<Number>();
+                points.add(timestamp);
+                points.add(filteredValue);
+                data.add(points);
+                totalElapsedTime = 0;
+              }
             }
+            initial++;
           }
           results.close();
           currentDay = currentDay + (i * MILLISECONDS_IN_DAY);
@@ -748,15 +794,15 @@ public class ChukwaHBaseStore {
       String[] metrics = { "SystemMetrics.LoadAverage.1" };
       createChart("1", "", "System Load Average", metrics, hostname);
       String[] cpuMetrics = { "SystemMetrics.cpu.combined", "SystemMetrics.cpu.sys", "SystemMetrics.cpu.user"
};
-      createChart("2", "%", "CPU Utilization", cpuMetrics, hostname);
+      createChart("2", "percent", "CPU Utilization", cpuMetrics, hostname);
       String[] memMetrics = { "SystemMetrics.memory.FreePercent", "SystemMetrics.memory.UsedPercent"};
-      createChart("3", "%", "Memory Utilization", memMetrics, hostname);
+      createChart("3", "percent", "Memory Utilization", memMetrics, hostname);
       String[] diskMetrics = { "SystemMetrics.disk.ReadBytes", "SystemMetrics.disk.WriteBytes"
};
       createChart("4", "bytes-decimal", "Disk Utilization", diskMetrics, hostname);
       String[] netMetrics = { "SystemMetrics.network.TxBytes", "SystemMetrics.network.RxBytes"
};
       createChart("5", "bytes", "Network Utilization", netMetrics, hostname);
       String[] swapMetrics = { "SystemMetrics.swap.Total", "SystemMetrics.swap.Used", "SystemMetrics.swap.Free"
};
-      createChart("6", "", "Swap Utilization", swapMetrics, hostname);
+      createChart("6", "bytes-decimal", "Swap Utilization", swapMetrics, hostname);
       
       // Populate default widgets
       Widget widget = new Widget();


Mime
View raw message