eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject incubator-eagle git commit: [EAGLE-665] Refactor kafka stream sink and hdfs audit topology using shuffle grouping
Date Sat, 22 Oct 2016 13:54:36 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 8d662e3a2 -> 435451eff


[EAGLE-665] Refactor kafka stream sink and hdfs audit topology using shuffle grouping

* Refactor kafka stream sink
* hdfs audit topology using shuffle grouping

https://issues.apache.org/jira/browse/EAGLE-665

Author: Hao Chen <hao@apache.org>

Closes #551 from haoch/refactorHdfsAuditApp.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/435451ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/435451ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/435451ef

Branch: refs/heads/master
Commit: 435451eff2d845b1953d99f57948ee79c70cadf3
Parents: 8d662e3
Author: Hao Chen <hao@apache.org>
Authored: Sat Oct 22 21:54:27 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Sat Oct 22 21:54:27 2016 +0800

----------------------------------------------------------------------
 .../apache/eagle/app/sink/KafkaStreamSink.java  |   9 +-
 .../eagle/app/sink/LoggingStreamSink.java       |   4 +-
 .../apache/eagle/app/sink/StormStreamSink.java  |  14 ++-
 .../eagle/security/hdfs/HDFSAuditLogParser.java | 113 ++++++++++---------
 .../security/hbase/HBaseAuditLogParserBolt.java |  19 ++--
 .../HbaseResourceSensitivityDataJoinBolt.java   |  12 +-
 .../AbstractHdfsAuditLogApplication.java        |  48 +++++---
 .../auditlog/HdfsAuditLogParserBolt.java        |   9 +-
 .../auditlog/HdfsSensitivityDataEnrichBolt.java |   6 +-
 .../security/auditlog/IPZoneDataEnrichBolt.java |  52 +++++----
 .../auditlog/kafka/MessageJsonScheme.java       |   2 +-
 .../auditlog/TestHdfsAuditLogApplication.java   |  86 ++++++++++++++
 .../auditlog/MapRFSAuditLogParserBolt.java      |   4 +-
 13 files changed, 246 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index e2a4b70..cf5351b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.app.sink;
 
+import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.BasicOutputCollector;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,8 +44,8 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig>
{
     }
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-        super.prepare(stormConf, context);
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
+        super.prepare(stormConf, context, collector);
         Properties properties = new Properties();
         properties.put("metadata.broker.list", config.getBrokerList());
         properties.put("serializer.class", config.getSerializerClass());
@@ -59,13 +60,13 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig>
{
     }
 
     @Override
-    protected void execute(Object key, Map event, BasicOutputCollector collector) {
+    protected void execute(Object key, Map event, OutputCollector collector) throws Exception
{
         try {
             String output = new ObjectMapper().writeValueAsString(event);
             producer.send(new KeyedMessage(this.topicId, key, output));
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
-            collector.reportError(ex);
+            throw ex;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 8256aba..3a02caf 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.app.sink;
 
-import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.task.OutputCollector;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
     }
 
     @Override
-    protected void execute(Object key, Map event, BasicOutputCollector collector) {
+    protected void execute(Object key, Map event, OutputCollector collector) throws Exception
{
         LOGGER.info("Receiving {}", event);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
index 7ea234a..ad40772 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
@@ -16,6 +16,8 @@
  */
 package org.apache.eagle.app.sink;
 
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.base.BaseRichBolt;
 import org.apache.eagle.metadata.model.StreamSinkConfig;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.BasicOutputCollector;
@@ -28,9 +30,10 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
-public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBasicBolt
implements StreamSink<K> {
+public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseRichBolt
implements StreamSink<K> {
     private static final Logger LOG = LoggerFactory.getLogger(StormStreamSink.class);
     private String streamId;
+    private OutputCollector collector;
 
     @Override
     public void init(String streamId, K config) {
@@ -38,15 +41,15 @@ public abstract class StormStreamSink<K extends StreamSinkConfig>
extends BaseBa
     }
 
     @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-        super.prepare(stormConf, context);
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
+        this.collector = collector;
     }
 
     /**
      * Implicitly hides the Tuple protocol inside code as Tuple[Key,Map].
      */
     @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
+    public void execute(Tuple input) {
         try {
             Map event = null;
             Object key = input.getValue(0);
@@ -63,13 +66,14 @@ public abstract class StormStreamSink<K extends StreamSinkConfig>
extends BaseBa
                 }
             }
             execute(key, event, collector);
+            collector.ack(input);
         } catch (Exception ex) {
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
         }
     }
 
-    protected abstract void execute(Object key, Map event, BasicOutputCollector collector);
+    protected abstract void execute(Object key, Map event, OutputCollector collector) throws
Exception;
 
     private Map tupleAsMap(Tuple tuple) {
         Map values = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
index 734cc8c..7257975 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
@@ -20,67 +20,78 @@ import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.security.util.LogParseUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 
 /**
  * e.g. 2015-09-21 21:36:52,172 INFO FSNamesystem.audit: allowed=true   ugi=hadoop (auth:KERBEROS)
    ip=/x.x.x.x   cmd=getfileinfo src=/tmp   dst=null        perm=null       proto=rpc
  */
 
-public final class HDFSAuditLogParser implements Serializable{
-	private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
+public final class HDFSAuditLogParser implements Serializable {
+    private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
 
-	public HDFSAuditLogParser(){
-	}
+    public HDFSAuditLogParser() {
+    }
 
-	public static String parseUser(String ugi) {
-		/** e.g.
-		 * .1)user@APD.xyz.com
-		 * .2)hadoop/123.dc1.xyz.com@xyz.com (auth:KERBEROS)
-		 * .3)hadoop (auth:KERBEROS)
-		 */
-		int index = ugi.indexOf("/");
-		if (index != -1) return ugi.substring(0, index).trim();
-		index = ugi.indexOf("@");
-		if (index != -1) return ugi.substring(0, index).trim();
-		index = ugi.indexOf("(");
-		return ugi.substring(0, index).trim();
-	}
+    public static String parseUser(String ugi) {
+        /** e.g.
+         * .1)user@APD.xyz.com
+         * .2)hadoop/123.dc1.xyz.com@xyz.com (auth:KERBEROS)
+         * .3)hadoop (auth:KERBEROS)
+         */
+        int index = ugi.indexOf("/");
+        if (index != -1) {
+            return ugi.substring(0, index).trim();
+        }
+        index = ugi.indexOf("@");
+        if (index != -1) {
+            return ugi.substring(0, index).trim();
+        }
+        index = ugi.indexOf("(");
+        return ugi.substring(0, index).trim();
+    }
 
-	public HDFSAuditLogObject parse(String log) throws Exception{
-		int index0 = log.indexOf(" ");
-		index0 = log.indexOf(" ",index0+1);
-		String data = log.substring(0, index0).trim();
-		int index1 = log.indexOf("allowed="); int len1 = 8;
-		int index2 = log.indexOf("ugi="); int len2 = 4;
-		int index3 = log.indexOf("ip=/"); int len3 = 4;
-		int index4 = log.indexOf("cmd="); int len4 = 4;
-		int index5 = log.indexOf("src="); int len5= 4;
-		int index6 = log.indexOf("dst="); int len6 = 4;
-		int index7 = log.indexOf("perm=");
+    public HDFSAuditLogObject parse(String log) throws Exception {
+        int index0 = log.indexOf(" ");
+        index0 = log.indexOf(" ", index0 + 1);
+        String data = log.substring(0, index0).trim();
+        int index1 = log.indexOf("allowed=");
+        int len1 = 8;
+        int index2 = log.indexOf("ugi=");
+        int len2 = 4;
+        int index3 = log.indexOf("ip=/");
+        int len3 = 4;
+        int index4 = log.indexOf("cmd=");
+        int len4 = 4;
+        int index5 = log.indexOf("src=");
+        int len5 = 4;
+        int index6 = log.indexOf("dst=");
+        int len6 = 4;
+        int index7 = log.indexOf("perm=");
 
-		String allowed = log.substring(index1 + len1, index2).trim();
-		String ugi = log.substring(index2 + len2, index3).trim();
-		String ip = log.substring(index3 + len3, index4).trim();
-		String cmd = log.substring(index4 + len4, index5).trim();
-		String src = log.substring(index5 + len5, index6).trim();
-		String dst = log.substring(index6 + len6, index7).trim();
+        String allowed = log.substring(index1 + len1, index2).trim();
+        String ugi = log.substring(index2 + len2, index3).trim();
+        String ip = log.substring(index3 + len3, index4).trim();
+        String cmd = log.substring(index4 + len4, index5).trim();
+        String src = log.substring(index5 + len5, index6).trim();
+        String dst = log.substring(index6 + len6, index7).trim();
 
-		HDFSAuditLogObject entity = new HDFSAuditLogObject();
-		String user = LogParseUtil.parseUserFromUGI(ugi);
-		if (src != null && src.equals("null")) {
-			src = null;
-		}
+        HDFSAuditLogObject entity = new HDFSAuditLogObject();
+        String user = LogParseUtil.parseUserFromUGI(ugi);
+        if (src != null && src.equals("null")) {
+            src = null;
+        }
 
-		if (dst != null && dst.equals("null")) {
-			dst = null;
-		}
-		entity.user = user;
-		entity.cmd = cmd;
-		entity.src = src;
-		entity.dst = dst;
-		entity.host = ip;
-		entity.allowed = Boolean.valueOf(allowed);
-		entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data);
-		return entity;
-	}
+        if (dst != null && dst.equals("null")) {
+            dst = null;
+        }
+        entity.user = user;
+        entity.cmd = cmd;
+        entity.src = src;
+        entity.dst = dst;
+        entity.host = ip;
+        entity.allowed = Boolean.valueOf(allowed);
+        entity.timestamp = DateTimeUtil.humanDateToMilliseconds(data);
+        return entity;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java
b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java
index 79eadd1..ffed0ef 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogParserBolt.java
@@ -26,9 +26,7 @@ import backtype.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.*;
 
 /**
  * Since 6/7/16.
@@ -36,6 +34,7 @@ import java.util.TreeMap;
 public class HBaseAuditLogParserBolt extends BaseRichBolt {
     private static Logger LOG = LoggerFactory.getLogger(HBaseAuditLogParserBolt.class);
     private OutputCollector collector;
+    private static final HbaseAuditLogParser parser = new HbaseAuditLogParser();
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
@@ -44,12 +43,10 @@ public class HBaseAuditLogParserBolt extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
-        String logLine = new String(input.getString(0));
-
-        HbaseAuditLogParser parser = new HbaseAuditLogParser();
-        try{
+        String logLine = input.getString(0);
+        try {
             HbaseAuditLogObject entity = parser.parse(logLine);
-            Map<String, Object> map = new TreeMap<String, Object>();
+            Map<String, Object> map = new TreeMap<>();
             map.put("action", entity.action);
             map.put("host", entity.host);
             map.put("status", entity.status);
@@ -57,10 +54,10 @@ public class HBaseAuditLogParserBolt extends BaseRichBolt {
             map.put("scope", entity.scope);
             map.put("user", entity.user);
             map.put("timestamp", entity.timestamp);
-            collector.emit(Arrays.asList(map));
-        }catch(Exception ex){
+            collector.emit(Collections.singletonList(map));
+        } catch (Exception ex) {
             LOG.error("Failing parse and ignore audit log {} ", logLine, ex);
-        }finally {
+        } finally {
             collector.ack(input);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
index c1005cd..a1545c2 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
@@ -30,10 +30,10 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.regex.Pattern;
 
-public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt<HBaseSensitivityEntity,
String>  {
+public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt<HBaseSensitivityEntity,
String> {
     private final static Logger LOG = LoggerFactory.getLogger(HbaseResourceSensitivityDataJoinBolt.class);
 
-    public HbaseResourceSensitivityDataJoinBolt(Config config){
+    public HbaseResourceSensitivityDataJoinBolt(Config config) {
         super(config, new HBaseSensitivityDataEnrichLCM(config));
     }
 
@@ -57,8 +57,8 @@ public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt
                 }
             }
             Map<String, Object> newEvent = new TreeMap<String, Object>(event);
-            newEvent.put("sensitivityType", sensitivityEntity == null ?
-                    "NA" : sensitivityEntity.getSensitivityType());
+            newEvent.put("sensitivityType", sensitivityEntity == null
+                ? "NA" : sensitivityEntity.getSensitivityType());
             newEvent.put("scope", resource);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("After hbase resource sensitivity lookup: " + newEvent);
@@ -66,9 +66,9 @@ public class HbaseResourceSensitivityDataJoinBolt extends AbstractDataEnrichBolt
             LOG.info("After hbase resource sensitivity lookup: " + newEvent);
             // push to Kafka sink
             collector.emit(Arrays.asList(newEvent.get("user"), newEvent));
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error joining data, ignore it", ex);
-        }finally {
+        } finally {
             collector.ack(input);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index b9f480b..a1daf89 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -61,34 +61,50 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication
{
 
         builder.setSpout("ingest", spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks);
 
+        // ---------------------
+        // ingest -> parserBolt
+        // ---------------------
 
         BaseRichBolt parserBolt = getParserBolt();
-        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks);
+        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping("ingest");
+        boltDeclarer.shuffleGrouping("ingest");
 
-        Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") ||
config.getBoolean("eagleProps.useDefaultPartition");
-        if(useDefaultPartition){
-            boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
-        }else{
-            boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config)));
-        }
+        // Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition")
|| config.getBoolean("eagleProps.useDefaultPartition");
+        // if (useDefaultPartition) {
+        //    boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+        // } else {
+        //    boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config)));
+        // }
+
+        // ------------------------------
+        // parserBolt -> sensitivityJoin
+        // ------------------------------
 
         HdfsSensitivityDataEnrichBolt sensitivityDataJoinBolt = new HdfsSensitivityDataEnrichBolt(config);
         BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin",
sensitivityDataJoinBolt, numOfSensitivityJoinTasks).setNumTasks(numOfSensitivityJoinTasks);
-        sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+        // sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+        sensitivityDataJoinBoltDeclarer.shuffleGrouping("parserBolt");
 
+        // ------------------------------
+        // sensitivityJoin -> ipZoneJoin
+        // ------------------------------
         IPZoneDataEnrichBolt ipZoneDataJoinBolt = new IPZoneDataEnrichBolt(config);
         BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt,
numOfIPZoneJoinTasks).setNumTasks(numOfIPZoneJoinTasks);
-        ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
+        // ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
+        ipZoneDataJoinBoltDeclarer.shuffleGrouping("sensitivityJoin");
+
+        // ------------------------
+        // ipZoneJoin -> kafkaSink
+        // ------------------------
 
-        StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream",config);
+        StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream", config);
         BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks).setNumTasks(numOfSinkTasks);
-        kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user"));
+        kafkaBoltDeclarer.shuffleGrouping("ipZoneJoin");
         return builder.createTopology();
-
-
     }
 
     public abstract BaseRichBolt getParserBolt();
+
     public abstract String getSinkStreamName();
 
     public static PartitionStrategy createStrategy(Config config) {
@@ -103,10 +119,8 @@ public abstract class AbstractHdfsAuditLogApplication extends StormApplication
{
         String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
         Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1)
: 60;
         String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
-        Integer kafkaStatisticRangeInMin =  config.hasPath(key2) ? config.getInt(key2) :
60;
+        Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60;
         PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin
* DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
         return strategy;
     }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
index 1134cb5..4590e8a 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
@@ -30,7 +30,7 @@ import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -40,6 +40,7 @@ import java.util.TreeMap;
 public class HdfsAuditLogParserBolt extends BaseRichBolt {
     private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class);
     private OutputCollector collector;
+    private static final HDFSAuditLogParser parser = new HDFSAuditLogParser();
 
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
@@ -49,12 +50,10 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         String logLine = input.getString(0);
-
-        HDFSAuditLogParser parser = new HDFSAuditLogParser();
         HDFSAuditLogObject entity = null;
         try {
             entity = parser.parse(logLine);
-            Map<String, Object> map = new TreeMap<String, Object>();
+            Map<String, Object> map = new TreeMap<>();
             map.put("src", entity.src);
             map.put("dst", entity.dst);
             map.put("host", entity.host);
@@ -62,7 +61,7 @@ public class HdfsAuditLogParserBolt extends BaseRichBolt {
             map.put("allowed", entity.allowed);
             map.put("user", entity.user);
             map.put("cmd", entity.cmd);
-            collector.emit(Arrays.asList(map));
+            collector.emit(Collections.singletonList(map));
         } catch (Exception ex) {
             LOG.error("Failing parse audit log message {}", logLine, ex);
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java
index 2031108..f8e7c4d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsSensitivityDataEnrichBolt.java
@@ -35,7 +35,7 @@ import java.util.regex.Pattern;
 public class HdfsSensitivityDataEnrichBolt extends AbstractDataEnrichBolt<HdfsSensitivityEntity,
String> {
     private static Logger LOG = LoggerFactory.getLogger(HdfsSensitivityDataEnrichBolt.class);
 
-    public HdfsSensitivityDataEnrichBolt(Config config){
+    public HdfsSensitivityDataEnrichBolt(Config config) {
         super(config, new HdfsSensitivityDataEnrichLCM(config));
     }
 
@@ -68,9 +68,9 @@ public class HdfsSensitivityDataEnrichBolt extends AbstractDataEnrichBolt<HdfsSe
             }
             // LOG.info(">>>> After file sensitivity lookup: " + event);
             collector.emit(Arrays.asList(event.get("user"), event));
-        }catch(Exception ex){
+        } catch (Exception ex) {
             LOG.error("error joining data, ignore it", ex);
-        }finally {
+        } finally {
             collector.ack(input);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java
index ed0e17b..faab22a 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataEnrichBolt.java
@@ -30,33 +30,35 @@ import java.util.Map;
 import java.util.TreeMap;
 
 public class IPZoneDataEnrichBolt extends AbstractDataEnrichBolt<IPZoneEntity, String>
{
-	private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataEnrichBolt.class);
+    private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataEnrichBolt.class);
 
-	public IPZoneDataEnrichBolt(Config config){
-		super(config, new IPZoneDataEnrichLCM(config));
-	}
+    public IPZoneDataEnrichBolt(Config config) {
+        super(config, new IPZoneDataEnrichLCM(config));
+    }
 
-	@Override
-	public void executeWithEnrich(Tuple input, Map<String, IPZoneEntity> map) {
-		try {
-			Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1);
-			Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow
copy
-			IPZoneEntity e = null;
-			if (map != null) {
-				e = map.get(event.get("host"));
-			}
-			event.put("securityZone", e == null ? "NA" : e.getSecurityZone());
-			if (LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event);
-			collector.emit(Arrays.asList(event.get("user"), event));
-		}catch(Exception ex){
-			LOG.error("error joining data, ignore it", ex);
-		}finally {
-			collector.ack(input);
-		}
+    @Override
+    public void executeWithEnrich(Tuple input, Map<String, IPZoneEntity> map) {
+        try {
+            Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1);
+            Map<String, Object> event = new TreeMap<String, Object>(toBeCopied);
// shallow copy
+            IPZoneEntity e = null;
+            if (map != null) {
+                e = map.get(event.get("host"));
+            }
+            event.put("securityZone", e == null ? "NA" : e.getSecurityZone());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("After IP zone lookup: " + event);
+            }
+            collector.emit(Arrays.asList(event.get("user"), event));
+        } catch (Exception ex) {
+            LOG.error("error joining data, ignore it", ex);
+        } finally {
+            collector.ack(input);
+        }
     }
 
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("user", "message"));
-	}
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("user", "message"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
index 9ffcaf9..1d48208 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
@@ -21,7 +21,7 @@ package org.apache.eagle.security.auditlog.kafka;
 import backtype.storm.spout.Scheme;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import org.codehaus.jackson.map.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import storm.kafka.StringScheme;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java
b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java
new file mode 100644
index 0000000..e09f55d
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsAuditLogApplication.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.auditlog;
+
+
+import com.google.inject.Inject;
+import org.apache.eagle.app.resource.ApplicationResource;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationTestBase;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.resource.SiteResource;
+import org.apache.eagle.metadata.service.ApplicationStatusUpdateService;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Ignore
+public class TestHdfsAuditLogApplication extends ApplicationTestBase {
+
+    @Inject
+    private SiteResource siteResource;
+    @Inject
+    private ApplicationResource applicationResource;
+    @Inject
+    ApplicationStatusUpdateService statusUpdateService;
+
+    @Test
+    public void testHdfsAuditLogApplication() {
+        // Create local site
+        SiteEntity siteEntity = new SiteEntity();
+        siteEntity.setSiteId("test_site");
+        siteEntity.setSiteName("Test Site");
+        siteEntity.setDescription("Test Site for HdfsAuditLogApplication");
+        siteResource.createSite(siteEntity);
+        Assert.assertNotNull(siteEntity.getUuid());
+
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site",
"HdfsAuditLogApplication", ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(getConf());
+        // Install application
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
+        // Start application
+        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
+        statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+        // Stop application
+        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
+        statusUpdateService.updateApplicationEntityStatus(applicationEntity);
+        // Uninstall application
+        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
+        try {
+            applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
+            Assert.fail("Application instance (UUID: " + applicationEntity.getUuid() + ")
should have been uninstalled");
+        } catch (Exception ex) {
+            // Expected exception
+        }
+    }
+
+
+    private Map<String, Object> getConf() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("dataSinkConfig.topic", "hdfs_audit_log_test");
+        conf.put("dataSinkConfig.brokerList", "localhost:6667");
+        conf.put("dataSinkConfig.serializerClass", "serializerClass");
+        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+        conf.put("spoutNum", 2);
+        conf.put("mode", "LOCAL");
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/435451ef/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java
b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java
index 37e55c6..e2f0520 100644
--- a/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java
+++ b/eagle-security/eagle-security-maprfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/MapRFSAuditLogParserBolt.java
@@ -52,7 +52,7 @@ public class MapRFSAuditLogParserBolt extends BaseRichBolt {
 
         MAPRFSAuditLogParser parser = new MAPRFSAuditLogParser();
         MAPRFSAuditLogObject entity = null;
-        try{
+        try {
             entity = parser.parse(logLine);
             Map<String, Object> map = new TreeMap<String, Object>();
             map.put("src", entity.src);
@@ -64,7 +64,7 @@ public class MapRFSAuditLogParserBolt extends BaseRichBolt {
             map.put("cmd", entity.cmd);
             map.put("volume", entity.volume);
             collector.emit(Arrays.asList(map));
-        }catch(Exception ex) {
+        } catch (Exception ex) {
             LOG.error("Failing parse audit log message", ex);
         }
     }



Mime
View raw message