eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qingwz...@apache.org
Subject eagle git commit: [EAGLE-940] HDFS traffic monitor by the namenode jmx data
Date Wed, 15 Mar 2017 10:27:32 GMT
Repository: eagle
Updated Branches:
  refs/heads/master ab5bf1076 -> 0071c79ab


[EAGLE-940] HDFS traffic monitor by the namenode jmx data

https://issues.apache.org/jira/browse/EAGLE-940

Author: Zhao, Qingwen <qingwzhao@apache.org>

Closes #862 from qingwen220/EAGLE-940.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/0071c79a
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/0071c79a
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/0071c79a

Branch: refs/heads/master
Commit: 0071c79abfd9d6535fe726721c0a17286b29af43
Parents: ab5bf10
Author: Zhao, Qingwen <qingwzhao@apache.org>
Authored: Wed Mar 15 18:27:17 2017 +0800
Committer: Zhao, Qingwen <qingwzhao@apache.org>
Committed: Wed Mar 15 18:27:17 2017 +0800

----------------------------------------------------------------------
 .../environment/builder/MetricDescriptor.java   |  30 ++--
 .../app/messaging/KafkaStreamProvider.java      |  21 ++-
 .../hadoop_jmx_collector/hadoop_jmx_config.json |  62 ++++++++
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |  15 +-
 .../hadoop_jmx_collector/metric_collector.py    | 101 ++++++------
 .../util/resourcefetch/RMResourceFetcher.java   |   4 +-
 .../auditlog/HdfsAuditLogApplication.java       |  62 ++++++++
 .../security/auditlog/TopWindowResult.java      | 126 +++++++++++++++
 .../security/auditlog/TrafficParserBolt.java    | 157 +++++++++++++++++++
 ...ecurity.auditlog.HdfsAuditLogAppProvider.xml |  97 +++++++++---
 .../security/traffic/TopWindowResultTest.java   |  61 +++++++
 11 files changed, 646 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
index e79e4d7..c33a92d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java
@@ -29,6 +29,18 @@ public class MetricDescriptor implements Serializable {
      * Support simple and complex name format, by default using "metric" field.
      */
     private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric");
+    private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME);
+    private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site");
+
+    private static final String DEFAULT_METRIC_GROUP_NAME = "Default";
+
+    public MetricNameSelector getMetricNameSelector() {
+        return metricNameSelector;
+    }
+
+    public void setMetricNameSelector(MetricNameSelector metricNameSelector) {
+        this.metricNameSelector = metricNameSelector;
+    }
 
     public MetricGroupSelector getMetricGroupSelector() {
         return metricGroupSelector;
@@ -38,12 +50,6 @@ public class MetricDescriptor implements Serializable {
         this.metricGroupSelector = metricGroupSelector;
     }
 
-
-    private static final String DEFAULT_METRIC_GROUP_NAME = "Default";
-
-    private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME);
-    private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site");
-
     /**
      * Support event/system time, by default using system time.
      */
@@ -59,17 +65,15 @@ public class MetricDescriptor implements Serializable {
      */
     private int granularity = Calendar.MINUTE;
 
-    /**
-     * Metric value field name.
-     */
     private String valueField = "value";
+    private String resourceField = "resource";
 
-    public MetricNameSelector getMetricNameSelector() {
-        return metricNameSelector;
+    public String getResourceField() {
+        return resourceField;
     }
 
-    public void setMetricNameSelector(MetricNameSelector metricNameSelector) {
-        this.metricNameSelector = metricNameSelector;
+    public void setResourceField(String resourceField) {
+        this.resourceField = resourceField;
     }
 
     public String getValueField() {

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
index eaa9ea0..8038d42 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
@@ -27,6 +27,7 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
     private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProvider.class);
     private static final String DEFAULT_SHARED_SINK_TOPIC_CONF_KEY = "dataSinkConfig.topic";
     private static final String DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY = "dataSourceConfig.topic";
+    private static final String DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY = "dataSourceConfig.schemeCls";
 
     private String getSinkTopicName(String streamId, Config config) {
         String streamSpecificTopicConfigKey = String.format("dataSinkConfig.%s.topic",streamId);
@@ -43,7 +44,7 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
     }
 
     private String getSourceTopicName(String streamId, Config config) {
-        String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId);;
+        String streamSpecificTopicConfigKey = String.format("dataSourceConfig.%s.topic",streamId);
         if (config.hasPath(streamSpecificTopicConfigKey)) {
             return config.getString(streamSpecificTopicConfigKey);
         } else if (config.hasPath(DEFAULT_SHARED_SOURCE_TOPIC_CONF_KEY)) {
@@ -55,6 +56,17 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
         }
     }
 
+    private String getSourceSchemeCls(String streamId, Config config) {
+        String streamSpecificSchemeClsKey = String.format("dataSourceConfig.%s.schemeCls", streamId);
+        if (config.hasPath(streamSpecificSchemeClsKey) ) {
+            return config.getString(streamSpecificSchemeClsKey);
+        } else if (config.hasPath(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY)) {
+            LOG.warn("Using default shared source topic {}: {}", DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY, config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY));
+            return config.getString(DEFAULT_SHARED_SOURCE_SCHEME_CLS_KEY);
+        }
+        return null;
+    }
+
     @Override
     public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
         KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig();
@@ -118,11 +130,12 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
         if (hasNonBlankConfigPath(config, "dataSourceConfig.forceFromStart")) {
             sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart"));
         }
-        if (hasNonBlankConfigPath(config, "dataSourceConfig.schemeCls")) {
+        String schemeCls = getSourceSchemeCls(streamId, config);
+        if (schemeCls != null && StringUtils.isNotBlank(schemeCls)) {
             try {
-                sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(config.getString("dataSourceConfig.schemeCls")));
+                sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(schemeCls));
             } catch (ClassNotFoundException e) {
-                LOG.error("Class not found error, dataSourceConfig.schemeCls = {}",config.getString("dataSourceConfig.schemeCls"),e);
+                LOG.error("Class not found error, dataSourceConfig.schemeCls = {}", schemeCls, e);
             }
         }
         return sourceConfig;

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json
new file mode 100755
index 0000000..23c89b3
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config.json
@@ -0,0 +1,62 @@
+{
+  "env": {
+    "site": "sandbox",
+    "metric_prefix": "hadoop.",
+    "log_file": "/tmp/hadoop-jmx-collector.log"
+  },
+  "input": [
+    {
+      "component": "namenode",
+      "host": "sandbox.hortonworks.com",
+      "port": "50070",
+      "https": true
+    }
+  ],
+  "filter": {
+    "bean_group_filter": ["hadoop","java.lang"],
+    "metric_name_filter": [
+      "hadoop.memory.heapmemoryusage.used",
+      "hadoop.memory.nonheapmemoryusage.used",
+      "hadoop.namenode.fsnamesystemstate.capacitytotal",
+      "hadoop.namenode.fsnamesystemstate.topuseropcounts",
+      "hadoop.namenode.namenodeinfo.corruptfiles",
+      "hadoop.namenode.dfs.capacityused",
+      "hadoop.namenode.dfs.capacityremaining",
+      "hadoop.namenode.dfs.blockstotal",
+      "hadoop.namenode.dfs.filestotal",
+      "hadoop.namenode.dfs.underreplicatedblocks",
+      "hadoop.namenode.dfs.missingblocks",
+      "hadoop.namenode.dfs.corruptblocks",
+      "hadoop.namenode.dfs.lastcheckpointtime",
+      "hadoop.namenode.dfs.transactionssincelastcheckpoint",
+      "hadoop.namenode.dfs.lastwrittentransactionid",
+      "hadoop.namenode.dfs.snapshottabledirectories",
+      "hadoop.namenode.dfs.snapshots",
+      "hadoop.namenode.rpc.rpcqueuetimeavgtime",
+      "hadoop.namenode.rpc.rpcprocessingtimeavgtime",
+      "hadoop.namenode.rpc.numopenconnections",
+      "hadoop.namenode.rpc.callqueuelength",
+
+      "hadoop.datanode.fsdatasetstate.capacity",
+      "hadoop.datanode.fsdatasetstate.dfsused",
+      "hadoop.datanode.datanodeinfo.xceivercount",
+      "hadoop.datanode.rpc.rpcqueuetimeavgtime",
+      "hadoop.datanode.rpc.rpcprocessingtimeavgtime",
+      "hadoop.datanode.rpc.numopenconnections",
+      "hadoop.datanode.rpc.callqueuelength"
+    ]
+  },
+  "output": {
+    "kafka": {
+      "debug": false,
+      "default_topic": "hadoop_jmx_metric_sandbox",
+      "metric_topic_mapping": {
+        "hadoop.namenode.namenodeinfo.corruptfiles": "hadoop_jmx_resource_sandbox",
+        "hadoop.namenode.fsnamesystemstate.topuseropcounts" : "hadoop_jmx_resource_sandbox"
+      },
+      "broker_list": [
+        "sandbox.hortonworks.com:6667"
+      ]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 1b036cd..60c6367 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 #
 
-from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
+from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricNameConverter
 import json, logging, fnmatch, sys
 
 class NNSafeModeMetric(JmxMetricListener):
@@ -38,10 +38,16 @@ class NNHAMetric(JmxMetricListener):
             else:
                 self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1)
 
-class NameNodeInfo(JmxMetricListener):
+class corruptfilesMetric(JmxMetricListener):
     def on_metric(self, metric):
         if metric["metric"] == "hadoop.namenode.namenodeinfo.corruptfiles":
-            self.collector.collect(metric, "string")
+            self.collector.collect(metric, "string", MetricNameConverter())
+
+class TopUserOpCountsMetric(JmxMetricListener):
+    def on_metric(self, metric):
+        if metric["metric"] == "hadoop.namenode.fsnamesystemstate.topuseropcounts":
+            self.collector.collect(metric, "string", MetricNameConverter())
+
 
 class MemoryUsageMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.jvm"
@@ -107,6 +113,7 @@ if __name__ == '__main__':
         JournalTransactionInfoMetric(),
         DatanodeFSDatasetState(),
         HBaseRegionServerMetric(),
-        NameNodeInfo()
+        corruptfilesMetric(),
+        TopUserOpCountsMetric()
     )
     Runner.run(collector)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c3fdb43..c83fe6b 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -66,8 +66,8 @@ class Helper:
                                 datefmt='%m-%d %H:%M')
         else:
             logging.basicConfig(level=logging.INFO,
-                            format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
-                            datefmt='%m-%d %H:%M')
+                                format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
+                                datefmt='%m-%d %H:%M')
 
         logging.info("Loaded config from %s", abs_file_path)
         return config
@@ -192,7 +192,7 @@ class MetricSender(object):
     def open(self):
         pass
 
-    def send(self, msg):
+    def send(self, msg, converter = None):
         raise Exception("should be overrode")
 
     def close(self):
@@ -210,11 +210,14 @@ class KafkaMetricSender(MetricSender):
         self.default_topic = None
         if kafka_config.has_key("default_topic"):
             self.default_topic = kafka_config["default_topic"].encode('utf-8')
-            logging.info("Using default topic: %s" % self.default_topic)
         self.component_topic_mapping = {}
         if kafka_config.has_key("component_topic_mapping"):
             self.component_topic_mapping = kafka_config["component_topic_mapping"]
 
+        self.metric_topic_mapping = {}
+        if kafka_config.has_key("metric_topic_mapping"):
+            self.metric_topic_mapping = kafka_config["metric_topic_mapping"]
+
         if not self.default_topic and not bool(self.component_topic_mapping):
             raise Exception("both kafka config 'topic' and 'component_topic_mapping' are empty")
 
@@ -229,6 +232,11 @@ class KafkaMetricSender(MetricSender):
             logging.info("Overrode output.kafka.debug: " + str(self.debug_enabled))
 
     def get_topic_id(self, msg):
+        if msg.has_key("metric"):
+            metric = msg["metric"]
+            if self.metric_topic_mapping.has_key(metric):
+                return self.metric_topic_mapping[metric]
+
         if msg.has_key("component"):
             component = msg["component"]
             if self.component_topic_mapping.has_key(component):
@@ -247,11 +255,14 @@ class KafkaMetricSender(MetricSender):
                                              batch_send_every_t=30)
         self.start_time = time.time()
 
-    def send(self, msg):
+    def send(self, msg, converter = None):
         if self.debug_enabled:
             logging.info("Send message: " + str(msg))
         self.sent_count += 1
-        self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg))
+        topic = self.get_topic_id(msg)
+        if converter is not None:
+            converter.convert_metric(msg)
+        self.kafka_producer.send_messages(topic, json.dumps(msg))
 
     def close(self):
         logging.info("Closing kafka connection and producer")
@@ -267,18 +278,12 @@ class MetricCollector(threading.Thread):
     filters = []
     config = None
     closed = False
-    collected_event_count = 0
-    ignored_event_count = 0
-    emit_event_count = 0
 
     def __init__(self, config=None):
         threading.Thread.__init__(self)
         self.config = None
         self.sender = None
         self.fqdn = socket.getfqdn()
-        self.ignored_event_count = 0
-        self.collected_event_count = 0
-        self.emit_event_count = 0
 
     def init(self, config):
         self.config = config
@@ -301,39 +306,29 @@ class MetricCollector(threading.Thread):
     def start(self):
         super(MetricCollector, self).start()
 
-    def collect(self, msg, type='float'):
-        try:
-            self.collected_event_count = self.collected_event_count + 1
-            if not msg.has_key("timestamp"):
-                msg["timestamp"] = int(round(time.time() * 1000))
-            if msg.has_key("value") and type == 'float':
-                msg["value"] = float(str(msg["value"]))
-            elif msg.has_key("value") and type == 'string':
-                msg["value"] = str(msg["value"])
-            if not msg.has_key("host") or len(msg["host"]) == 0:
-                raise Exception("host is null: " + str(msg))
-
-            if not msg.has_key("site"):
-                msg["site"] = self.config["env"]["site"]
-
-            if len(self.filters) == 0:
-                self.emit_event_count = self.emit_event_count + 1
-                self.sender.send(msg)
-                return
-            else:
-                for filter in self.filters:
-                    if filter.filter_metric(msg):
-                        self.emit_event_count = self.emit_event_count + 1
-                        self.sender.send(msg)
-                        return
-                self.ignored_event_count = self.ignored_event_count + 1
-        except Exception as e:
-            logging.error("Failed to emit metric: %s" % msg)
-            logging.exception(e)
+    def collect(self, msg, type = 'float', converter = None):
+        if not msg.has_key("timestamp"):
+            msg["timestamp"] = int(round(time.time() * 1000))
+        if msg.has_key("value") and type == 'float':
+            msg["value"] = float(str(msg["value"]))
+        elif msg.has_key("value") and type == 'string':
+            msg["value"] = str(msg["value"])
+        if not msg.has_key("host") or len(msg["host"]) == 0:
+            raise Exception("host is null: " + str(msg))
+        if not msg.has_key("site"):
+            msg["site"] = self.config["env"]["site"]
+        if len(self.filters) == 0:
+            self.sender.send(msg, converter)
+            return
+        else:
+            for filter in self.filters:
+                if filter.filter_metric(msg):
+                    self.sender.send(msg, converter)
+                    return
+        if self.sender.debug_enabled:
+            logging.info("Drop metric: " + str(msg))
 
     def close(self):
-        logging.info("Collected %s events (emitted: %s, ignored: %s)"
-                     % (self.collected_event_count, self.emit_event_count, self.ignored_event_count))
         self.sender.close()
         self.closed = True
 
@@ -451,7 +446,7 @@ class JmxMetricCollector(MetricCollector):
     def jmx_reader(self, source):
         host = source["host"]
         if source.has_key("source_host"):
-            host=source["source_host"]    
+            host=source["source_host"]
         port=source["port"]
         https=source["https"]
         protocol = "https" if https else "http"
@@ -582,3 +577,21 @@ class MetricNameFilter(MetricFilter):
                 if fnmatch.fnmatch(metric["metric"], name_filter):
                     return True
         return False
+
+
+# ========================
+#  Metric Converter
+# ========================
+
+class MetricConverter:
+    def convert_metric(self, metric):
+        """
+        Convert metric
+        """
+        return True
+
+class MetricNameConverter(MetricConverter):
+    def convert_metric(self, metric):
+        metric["resource"] = metric["metric"]
+        del metric["metric"]
+        return True
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
index e1991b0..2e967bc 100644
--- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
+++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
@@ -129,8 +129,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
         Map<String, AppInfo> result = new HashMap();
         List<AppInfo> apps = new ArrayList<>();
         try {
-            selector.checkUrl();
-
             String limit = "";
             int requests = 1;
             int timeRangePerRequestInMin = 60;
@@ -191,7 +189,6 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
                                                          Object... parameter) throws Exception {
         List<AppInfo> apps = new ArrayList<>();
         try {
-            selector.checkUrl();
             String url = getAcceptedAppURL();
             return doFetchApplicationsList(url, compressionType);
         } catch (Exception e) {
@@ -201,6 +198,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> {
     }
 
     private List<AppInfo> getResource(Constants.ResourceType resourceType, Constants.CompressionType compressionType, Object... parameter) throws Exception {
+        selector.checkUrl();
         switch (resourceType) {
             case COMPLETE_SPARK_JOB:
                 final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), (String) parameter[0]);

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
index 5f300f3..6f33517 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
@@ -21,9 +21,17 @@
 
 package org.apache.eagle.security.auditlog;
 
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichBolt;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.messaging.StormStreamSink;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
+import org.apache.eagle.security.traffic.HadoopLogAccumulatorBolt;
 
 /**
  * Since 8/11/16.
@@ -35,6 +43,60 @@ public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication {
     }
 
     @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        KafkaSpoutProvider provider = new KafkaSpoutProvider();
+        IRichSpout spout = provider.getSpout(config);
+
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+        int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
+        int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+        builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
+
+        BaseRichBolt parserBolt = getParserBolt(config);
+        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest");
+        boltDeclarer.shuffleGrouping("ingest");
+
+        HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config);
+        BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks);
+        // sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+        sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt");
+
+        // ------------------------------
+        // sensitivityJoin -> ipZoneJoin
+        // ------------------------------
+        IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config);
+        BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks);
+        // ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
+        ipZoneDataJoinBoltDeclarer.shuffleGrouping("sensitivityJoin");
+
+        // ------------------------
+        // ipZoneJoin -> kafkaSink
+        // ------------------------
+
+        StormStreamSink sinkBolt = environment.getStreamSink("HDFS_AUDIT_LOG_ENRICHED_STREAM", config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks);
+        kafkaBoltDeclarer.shuffleGrouping("ipZoneJoin");
+
+        if (config.hasPath(TRAFFIC_MONITOR_ENABLED) && config.getBoolean(TRAFFIC_MONITOR_ENABLED)) {
+            builder.setSpout("trafficSpout", environment.getStreamSource("HADOOP_JMX_RESOURCE_STREAM", config), 1)
+                    .setNumTasks(1);
+
+            builder.setBolt("trafficParserBolt", new TrafficParserBolt(config), 1)
+                    .setNumTasks(1)
+                    .shuffleGrouping("trafficSpout");
+            builder.setBolt("trafficSinkBolt", environment.getStreamSink("HDFS_AUDIT_LOG_TRAFFIC_STREAM", config), 1)
+                    .setNumTasks(1)
+                    .shuffleGrouping("trafficParserBolt");
+        }
+
+        return builder.createTopology();
+    }
+
+    @Override
     public String getSinkStreamName() {
         return "hdfs_audit_log_stream";
     }

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java
new file mode 100644
index 0000000..a8bfa4b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TopWindowResult.java
@@ -0,0 +1,126 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TopWindowResult implements Serializable {
+    private String timestamp;
+    private List<TopWindow> windows;
+
+    public List<TopWindow> getWindows() {
+        return windows;
+    }
+
+    public void setWindows(List<TopWindow> windows) {
+        this.windows = windows;
+    }
+
+    public String getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(String timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public static class TopWindow implements Serializable {
+        private int windowLenMs;
+        private List<Op> ops;
+
+        public int getWindowLenMs() {
+            return windowLenMs;
+        }
+
+        public void setWindowLenMs(int windowLenMs) {
+            this.windowLenMs = windowLenMs;
+        }
+
+        public List<Op> getOps() {
+            return ops;
+        }
+
+        public void setOps(List<Op> ops) {
+            this.ops = ops;
+        }
+
+    }
+
+    /**
+     * Represents an operation within a TopWindow. It contains a ranked
+     * set of the top users for the operation.
+     */
+    public static class Op implements Serializable {
+        private String opType;
+        private List<User> topUsers;
+        private long totalCount;
+
+        public String getOpType() {
+            return opType;
+        }
+
+        public void setOpType(String opType) {
+            this.opType = opType;
+        }
+
+        public List<User> getTopUsers() {
+            return topUsers;
+        }
+
+        public void setTopUsers(List<User> topUsers) {
+            this.topUsers = topUsers;
+        }
+
+        public long getTotalCount() {
+            return totalCount;
+        }
+
+        public void setTotalCount(long totalCount) {
+            this.totalCount = totalCount;
+        }
+    }
+
+    /**
+     * Represents a user who called an Op within a TopWindow. Specifies the
+     * user and the number of times the user called the operation.
+     */
+    public static class User implements Serializable {
+        private String user;
+        private long count;
+
+        public String getUser() {
+            return user;
+        }
+
+        public void setUser(String user) {
+            this.user = user;
+        }
+
+        public long getCount() {
+            return count;
+        }
+
+        public void setCount(long count) {
+            this.count = count;
+        }
+
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java
new file mode 100644
index 0000000..6388a8f
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/TrafficParserBolt.java
@@ -0,0 +1,157 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.builder.MetricDescriptor;
+import org.apache.eagle.app.utils.StreamConvertHelper;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.security.util.LogParseUtil;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TrafficParserBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(TrafficParserBolt.class);
+
+    private static final String TARGET_METRIC_NAME = "hadoop.namenode.fsnamesystemstate.topuseropcounts";
+    private static final String USER_METRIC_FORMAT = "hadoop.hdfs.auditlog.user.%sm.count";
+    private static final String CLUSTER_METRIC_FORMAT = "hadoop.hdfs.auditlog.cluster.%sm.count";
+
+    private static final SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
+
+    private OutputCollector collector;
+    private Config config;
+    private MetricDescriptor metricDescriptor;
+    private ObjectMapper objectMapper;
+    private IEagleServiceClient client;
+
+    public TrafficParserBolt(Config config) {
+        this.config = config;
+        this.metricDescriptor = new MetricDescriptor();
+        this.objectMapper = new ObjectMapper();
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.client = new EagleServiceClientImpl(config);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        Map event = null;
+        try {
+            event = StreamConvertHelper.tupleToEvent(input).f1();
+            String resource = (String) event.get(metricDescriptor.getResourceField());
+            if (resource.equalsIgnoreCase(TARGET_METRIC_NAME)) {
+                String value = (String) event.get(metricDescriptor.getValueField());
+                TopWindowResult rs = objectMapper.readValue(value, TopWindowResult.class);
+                long tm = df.parse(rs.getTimestamp()).getTime() / DateTimeUtil.ONEMINUTE * DateTimeUtil.ONEMINUTE;
+
+                for (TopWindowResult.TopWindow topWindow : rs.getWindows()) {
+                    for (TopWindowResult.Op op : topWindow.getOps()) {
+                        if (op.getOpType().equalsIgnoreCase("*")) {
+                            generateMetric(op, topWindow.getWindowLenMs(), tm);
+                        }
+                    }
+                }
+            }
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            collector.reportError(ex);
+        } finally {
+            collector.ack(input);
+        }
+
+    }
+
+    private void generateMetric(TopWindowResult.Op op, int windowLen, long timestamp) {
+        List<GenericMetricEntity> metrics = new ArrayList<>();
+        GenericMetricEntity clusterMetric = buildMetricEntity(timestamp, CLUSTER_METRIC_FORMAT, null, op.getTotalCount(), windowLen);
+        metrics.add(clusterMetric);
+        collector.emit(new Values("", buildStreamEvent(clusterMetric)));
+        for (TopWindowResult.User user : op.getTopUsers()) {
+            GenericMetricEntity metric = buildMetricEntity(timestamp, USER_METRIC_FORMAT, user.getUser(), user.getCount(), windowLen);
+            metrics.add(metric);
+            collector.emit(new Values("", buildStreamEvent(metric)));
+        }
+        try {
+            client.create(metrics);
+            LOG.info("successfully create {} metrics", metrics.size());
+        } catch (Exception e) {
+            LOG.error("create {} metrics failed due to {}", metrics.size(), e.getMessage(), e);
+        }
+    }
+
+    private GenericMetricEntity buildMetricEntity(long timestamp, String metricFormat, String user, long count, int windowLen) {
+        GenericMetricEntity entity = new GenericMetricEntity();
+        entity.setTimestamp(timestamp);
+        entity.setValue(new double[]{Double.valueOf(count)});
+        entity.setPrefix(String.format(metricFormat, windowLen / 60000));
+        Map<String, String> tags = new HashMap<>();
+        tags.put("site", config.getString("siteId"));
+        tags.put("user", LogParseUtil.parseUserFromUGI(user));
+        entity.setTags(tags);
+        return entity;
+    }
+
+    private Map<String, Object> buildStreamEvent(GenericMetricEntity entity) {
+        Map<String, Object> map = new HashMap<>();
+        map.put("site", entity.getTags().get("site"));
+        map.put("user", entity.getTags().get("user"));
+        map.put("timestamp", entity.getTimestamp());
+        map.put("metric", entity.getPrefix());
+        map.put("value", entity.getValue()[0]);
+        return map;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1", "f2"));
+    }
+
+    @Override
+    public void cleanup() {
+        if (client != null) {
+            LOG.info("closing service client...");
+            try {
+                client.close();
+            } catch (IOException e) {
+                LOG.error("close service client failed due to {}", e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 426a78f..d48340c 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -61,12 +61,14 @@
             <value>2</value>
             <description>number of sink tasks</description>
         </property>
+        <!--
         <property>
             <name>topology.numOfTrafficMonitorTasks</name>
             <displayName>Topology Traffic Monitor Tasks</displayName>
             <value>2</value>
             <description>number of traffic monitor tasks</description>
         </property>
+        -->
         <property>
             <name>topology.message.timeout.secs</name>
             <displayName>topology message timeout (secs)</displayName>
@@ -76,10 +78,17 @@
 
         <!-- data source configurations -->
         <property>
+            <name>dataSourceConfig.HADOOP_JMX_RESOURCE_STREAM.topic</name>
+            <displayName>Kafka Consumer Topic for HDFS Traffic Data</displayName>
+            <value>hadoop_jmx_resource_${siteId}</value>
+            <description>kafka topic for data consumption</description>
+        </property>
+        <property>
             <name>dataSourceConfig.topic</name>
-            <displayName>Kafka Topic for Data Consumption</displayName>
-            <value>hdfs_audit_log</value>
+            <displayName>Kafka Consumer Topic for HDFS Auditlog</displayName>
+            <value>hdfs_audit_log_${siteId}</value>
             <description>kafka topic for data consumption</description>
+            <required>true</required>
         </property>
         <property>
             <name>dataSourceConfig.zkConnection</name>
@@ -90,12 +99,18 @@
         </property>
         <property>
             <name>dataSourceConfig.schemeCls</name>
-            <displayName>Kafka Consumer SchemeCls</displayName>
+            <displayName>Kafka Consumer SchemeCls for Auditlog</displayName>
             <value>storm.kafka.StringScheme</value>
-            <description>scheme class</description>
+            <description>Kafka spout scheme class</description>
             <required>true</required>
         </property>
         <property>
+            <name>dataSourceConfig.HADOOP_JMX_RESOURCE_STREAM.schemeCls</name>
+            <displayName>Kafka Consumer SchemeCls for Traffic Data</displayName>
+            <value>org.apache.eagle.app.messaging.JsonSchema</value>
+            <description>Kafka spout scheme class</description>
+        </property>
+        <property>
             <name>dataSourceConfig.timeZone</name>
             <displayName>Log Time Zone</displayName>
             <description>time zone of hdfs audit log </description>
@@ -113,9 +128,16 @@
 
         <!-- data sink configurations -->
         <property>
-            <name>dataSinkConfig.topic</name>
-            <displayName>Kafka Topic for Parsed Data Sink</displayName>
-            <value>hdfs_audit_event</value>
+            <name>dataSinkConfig.HDFS_AUDIT_LOG_ENRICHED_STREAM.topic</name>
+            <displayName>Kafka Topic for Auditlog Event Sink</displayName>
+            <value>hdfs_audit_event_${site}</value>
+            <description>topic for kafka data sink</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.HDFS_AUDIT_LOG_TRAFFIC_STREAM.topic</name>
+            <displayName>Kafka Topic for Traffic Data Sink</displayName>
+            <value>hdfs_traffic_event_${site}</value>
             <description>topic for kafka data sink</description>
         </property>
         <property>
@@ -162,25 +184,28 @@
             <value>0</value>
             <description>value controls when a produce request is considered completed</description>
         </property>
+
         <property>
             <name>dataSinkConfig.trafficMonitorEnabled</name>
-            <displayName>Log Traffic Monitor Enabled</displayName>
+            <displayName>Auditlog Traffic Monitor Enabled</displayName>
             <value>false</value>
-            <description>enable the log throughput calculation</description>
+            <description>enable the log throughput calculation with the source data generated by Eagle metric collector scripts</description>
             <required>true</required>
         </property>
-        <property>
-            <name>dataSinkConfig.metricWindowSize</name>
-            <displayName>Window Size for Traffic Counting</displayName>
-            <value>10</value>
-            <description>window size to calculate the throughput</description>
-        </property>
-        <property>
-            <name>dataSinkConfig.metricSinkBatchSize</name>
-            <displayName>Batch Size for Flushing Traffic Metrics</displayName>
-            <value>10</value>
-            <description>batch size of flushing metrics</description>
-        </property>
+        <!--
+       <property>
+           <name>dataSinkConfig.metricWindowSize</name>
+           <displayName>Window Size for Traffic Counting</displayName>
+           <value>10</value>
+           <description>window size to calculate the throughput</description>
+       </property>
+       <property>
+           <name>dataSinkConfig.metricSinkBatchSize</name>
+           <displayName>Batch Size for Flushing Traffic Metrics</displayName>
+           <value>10</value>
+           <description>batch size of flushing metrics</description>
+       </property>
+       -->
 
         <!-- web app related configurations -->
         <property>
@@ -192,7 +217,7 @@
     </configuration>
     <streams>
         <stream>
-            <streamId>hdfs_audit_log_enriched_stream</streamId>
+            <streamId>HDFS_AUDIT_LOG_ENRICHED_STREAM</streamId>
             <group>Hadoop Log</group>
             <description>Hdfs Audit Log Enriched Stream</description>
             <validate>true</validate>
@@ -236,6 +261,34 @@
                 </column>
             </columns>
         </stream>
+        <stream>
+            <streamId>HDFS_AUDIT_LOG_TRAFFIC_STREAM</streamId>
+            <group>Hadoop Log</group>
+            <description>Hadoop JMX Metric Stream including name node, resource manager, etc.</description>
+            <columns>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+                <column>
+                    <name>metric</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>site</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>user</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+            </columns>
+        </stream>
     </streams>
     <docs>
         <install>

http://git-wip-us.apache.org/repos/asf/eagle/blob/0071c79a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java
new file mode 100644
index 0000000..f04dfcc
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/traffic/TopWindowResultTest.java
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.security.traffic;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.security.auditlog.TopWindowResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.TimeZone;
+
+public class TopWindowResultTest {
+
+    @Test
+    public void test() {
+        ObjectMapper objectMapper = new ObjectMapper();
+        TopWindowResult result = null;
+        String data2 = "{\"timestamp\":\"2017-03-08T00:29:33-0700\",\"windows\":[{\"windowLenMs\":60000,\"ops\":[]},{\"windowLenMs\":300000,\"ops\":[]},{\"windowLenMs\":1500000,\"ops\":[]}]}";
+        try {
+            result = objectMapper.readValue(data2, TopWindowResult.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Assert.assertTrue(result != null);
+        Assert.assertTrue(result.getWindows().size() == 3);
+    }
+
+    @Test
+    public void testTime() {
+        String time = "2017-03-07T21:36:51-0700";
+        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
+
+        try {
+            long t1 = df.parse(time).getTime();
+            String time2 = "2017-03-07 21:36:51";
+            long t2 = DateTimeUtil.humanDateToSeconds(time2, TimeZone.getTimeZone("GMT-7")) * 1000;
+            Assert.assertTrue(t1 == t2);
+        } catch (ParseException e) {
+            e.printStackTrace();
+        }
+    }
+}


Mime
View raw message