eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-444 convert eagle-gc app to use new app framework convert eagle-gc app to use new app framework
Date Sat, 13 Aug 2016 06:46:19 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 15e1c8335 -> 984586580


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
deleted file mode 100644
index 3401b3c..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.security.topo;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.Arrays;
-
-/**
- * Since 6/8/16.
- */
-public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
-    private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class);
-
-    private String configPrefix = "dataSourceConfig";
-
-    public NewKafkaSourcedSpoutProvider(){}
-
-    public NewKafkaSourcedSpoutProvider(String prefix){
-        this.configPrefix = prefix;
-    }
-
-    @Override
-    public BaseRichSpout getSpout(Config config){
-        Config context = config;
-        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
-        // Kafka topic
-        String topic = context.getString("topic");
-        // Kafka consumer group id
-        String groupId = context.getString("consumerGroupId");
-        // Kafka fetch size
-        int fetchSize = context.getInt("fetchSize");
-        // Kafka broker zk connection
-        String zkConnString = context.getString("zkConnection");
-        // transaction zkRoot
-        String zkRoot = context.getString("transactionZKRoot");
-
-        LOG.info(String.format("Use topic id: %s",topic));
-
-        String brokerZkPath = null;
-        if(context.hasPath("brokerZkPath")) {
-            brokerZkPath = context.getString("brokerZkPath");
-        }
-
-        BrokerHosts hosts;
-        if(brokerZkPath == null) {
-            hosts = new ZkHosts(zkConnString);
-        } else {
-            hosts = new ZkHosts(zkConnString, brokerZkPath);
-        }
-
-        SpoutConfig spoutConfig = new SpoutConfig(hosts,
-                topic,
-                zkRoot + "/" + topic,
-                groupId);
-
-        // transaction zkServers
-        spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
-        // transaction zkPort
-        spoutConfig.zkPort = context.getInt("transactionZKPort");
-        // transaction update interval
-        spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
-        // Kafka fetch size
-        spoutConfig.fetchSizeBytes = fetchSize;
-        // "startOffsetTime" is for test usage, prod should not use this
-        if (context.hasPath("startOffsetTime")) {
-            spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
-        }
-        // "forceFromStart" is for test usage, prod should not use this
-        if (context.hasPath("forceFromStart")) {
-            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-        }
-
-        if (context.hasPath("schemeCls")) {
-            try {
-                Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
-                spoutConfig.scheme = new SchemeAsMultiScheme(s);
-            }catch(Exception ex){
-                LOG.error("error instantiating scheme object");
-                throw new IllegalStateException(ex);
-            }
-        }else{
-            String err = "schemeCls must be present";
-            LOG.error(err);
-            throw new IllegalStateException(err);
-        }
-        return new KafkaSpout(spoutConfig);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
index 2cfee10..d6e319a 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
@@ -19,14 +19,7 @@
 
 package org.apache.eagle.security.hbase;
 
-import com.google.inject.AbstractModule;
 import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.apache.eagle.common.module.ModuleRegistry;
-import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
-import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore;
-import org.apache.eagle.security.service.ISecurityMetadataDAO;
-import org.apache.eagle.security.service.InMemMetadataDaoImpl;
-import org.apache.eagle.security.service.JDBCSecurityMetadataDAO;
 
 /**
  * Since 8/5/16.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 030212a..432043f 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -26,8 +26,7 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
-import storm.kafka.StringScheme;
+import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
 
 /**
  * Since 7/27/16.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
index ccbce98..a4aba68 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -26,7 +26,6 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
@@ -39,7 +38,7 @@ import org.apache.eagle.partition.PartitionStrategy;
 import org.apache.eagle.partition.PartitionStrategyImpl;
 import org.apache.eagle.security.partition.DataDistributionDaoImpl;
 import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
 import storm.kafka.StringScheme;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java
b/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java
index c6f2c1c..2e597fb 100644
--- a/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java
+++ b/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java
@@ -21,10 +21,9 @@ import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
 import org.apache.eagle.security.topo.TopologySubmitter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/pom.xml b/eagle-security/eagle-security-oozie-auditlog/pom.xml
index d1557ac..bab7e18 100644
--- a/eagle-security/eagle-security-oozie-auditlog/pom.xml
+++ b/eagle-security/eagle-security-oozie-auditlog/pom.xml
@@ -34,5 +34,10 @@
             <artifactId>eagle-security-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
new file mode 100644
index 0000000..361f8df
--- /dev/null
+++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.eagle.security.oozie.parse;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinBolt;
+import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+
+/**
+ * Since 8/12/16.
+ */
+public class OozieAuditLogApplication extends StormApplication {
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+    public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
+    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+        IRichSpout spout = provider.getSpout(config);
+
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfParserTask = config.getInt(PARSER_TASK_NUM);
+        int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+        builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+        OozieAuditLogParserBolt parserBolt = new OozieAuditLogParserBolt();
+        BoltDeclarer parserBoltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTask);
+        parserBoltDeclarer.fieldsGrouping("ingest", new Fields("f1"));
+
+
+        OozieResourceSensitivityDataJoinBolt joinBolt = new OozieResourceSensitivityDataJoinBolt(config);
+        BoltDeclarer boltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
+        boltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
+        StormStreamSink sinkBolt = environment.getStreamSink("oozie_audit_log_stream",config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        OozieAuditLogApplication app = new OozieAuditLogApplication();
+        app.run(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java
b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java
new file mode 100644
index 0000000..13f5e8c
--- /dev/null
+++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java
@@ -0,0 +1,86 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *  Unless required by applicable law or agreed to in writing, software
+ *  *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  See the License for the specific language governing permissions and
+ *  *  limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.eagle.security.oozie.parse;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Since 8/12/16.
+ */
+public class OozieAuditLogParserBolt extends BaseRichBolt{
+    private static Logger LOG = LoggerFactory.getLogger(OozieAuditLogParserBolt.class);
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String logLine = new String(input.getString(0));
+
+        try {
+            OozieAuditLogParser parser = new OozieAuditLogParser();
+            OozieAuditLogObject entity = null;
+            try {
+                entity = parser.parse(logLine);
+            } catch (Exception ex) {
+                LOG.error("Failing oozie parse audit log message", ex);
+            }
+            Map<String, Object> map = new TreeMap<String, Object>();
+            map.put("timestamp", entity.timestamp);
+            map.put("level", entity.level);
+            map.put("ip", entity.ip);
+            map.put("user", entity.user);
+            map.put("group", entity.group);
+            map.put("app", entity.app);
+            map.put("jobId", entity.jobId);
+            map.put("operation", entity.operation);
+            map.put("parameter", entity.parameter);
+            map.put("status", entity.status);
+            map.put("httpcode", entity.httpcode);
+            map.put("errorcode", entity.errorcode);
+            map.put("errormessage", entity.errormessage);
+            collector.emit(Arrays.asList(map));
+        }catch(Exception ex){
+            LOG.error("error in parsing oozie audit log", ex);
+        }finally {
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java
b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java
deleted file mode 100644
index 85dfa74..0000000
--- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.eagle.security.oozie.parse;
-
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinExecutor;
-
-public class OozieAuditLogProcessorMain {
-    public static void main(String[] args) throws Exception {
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
-        env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
-                .flatMap(new OozieResourceSensitivityDataJoinExecutor())
-                .alertWithConsumer("oozieSecurityLogEventStream", "oozieAuditLogAlertExecutor");
-        env.execute();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java
b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java
new file mode 100644
index 0000000..740cea0
--- /dev/null
+++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.eagle.security.oozie.parse.sensitivity;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.security.entity.OozieResourceSensitivityAPIEntity;
+import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+
+public class OozieResourceSensitivityDataJoinBolt extends BaseRichBolt {
+    private final static Logger LOG = LoggerFactory.getLogger(OozieResourceSensitivityDataJoinBolt.class);
+    private Config config;
+    private OutputCollector collector;
+
+    public OozieResourceSensitivityDataJoinBolt(Config config){
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
+        this.collector = collector;
+        // start hbase sensitivity data polling
+        try {
+            ExternalDataJoiner joiner = new ExternalDataJoiner(
+                    OozieResourceSensitivityPollingJob.class, config, context.getThisComponentId()
+ "." + context.getThisTaskIndex());
+            joiner.start();
+        } catch(Exception ex){
+            LOG.error("Fail bringing up quartz scheduler.", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> event = (Map<String, Object>) input.getValue(0);
+            @SuppressWarnings("unchecked")
+            Map<String, OozieResourceSensitivityAPIEntity> map =
+                    (Map<String, OozieResourceSensitivityAPIEntity>) ExternalDataCache
+                            .getInstance()
+                            .getJobResult(OozieResourceSensitivityPollingJob.class);
+            LOG.info(">>>> event: " + event + " >>>> map: " + map);
+
+            String resource = (String) event.get("jobId");
+
+            OozieResourceSensitivityAPIEntity sensitivityEntity = null;
+
+            if (map != null && resource != "") {
+                for (String key : map.keySet()) {
+                    Pattern pattern = Pattern.compile(key, Pattern.CASE_INSENSITIVE);
+                    if (pattern.matcher(resource).find()) {
+                        sensitivityEntity = map.get(key);
+                        break;
+                    }
+                }
+            }
+            Map<String, Object> newEvent = new TreeMap<String, Object>(event);
+            newEvent.put("sensitivityType", sensitivityEntity == null ? "NA" : sensitivityEntity.getSensitivityType());
+            //newEvent.put("jobId", resource);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("After oozie resource sensitivity lookup: " + newEvent);
+            }
+            LOG.info("After oozie resource sensitivity lookup: " + newEvent);
+            collector.emit(Arrays.asList(newEvent.get("user"), newEvent));
+        }catch(Exception ex){
+            LOG.error("error join external sensitivity data", ex);
+        }finally {
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("user", "message"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java
b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java
deleted file mode 100644
index 829ad79..0000000
--- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.eagle.security.oozie.parse.sensitivity;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.entity.OozieResourceSensitivityAPIEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.regex.Pattern;
-
-public class OozieResourceSensitivityDataJoinExecutor extends JavaStormStreamExecutor2<String,
Map> {
-    private final static Logger LOG = LoggerFactory.getLogger(OozieResourceSensitivityDataJoinExecutor.class);
-    private Config config;
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-        // start hive resource data polling
-        try {
-            ExternalDataJoiner joiner = new ExternalDataJoiner(OozieResourceSensitivityPollingJob.class,
config, "1");
-            joiner.start();
-        } catch (Exception ex) {
-            LOG.error("Fail to bring up quartz scheduler.", ex);
-            throw new IllegalStateException(ex);
-        }
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple2<String, Map>>
outputCollector) {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> event = (Map<String, Object>) input.get(0);
-        @SuppressWarnings("unchecked")
-        Map<String, OozieResourceSensitivityAPIEntity> map =
-                (Map<String, OozieResourceSensitivityAPIEntity>) ExternalDataCache
-                        .getInstance()
-                        .getJobResult(OozieResourceSensitivityPollingJob.class);
-        LOG.info(">>>> event: " + event + " >>>> map: " + map);
-
-        String resource = (String) event.get("jobId");
-
-        OozieResourceSensitivityAPIEntity sensitivityEntity = null;
-
-        if (map != null && resource != "") {
-            for (String key : map.keySet()) {
-                Pattern pattern = Pattern.compile(key, Pattern.CASE_INSENSITIVE);
-                if (pattern.matcher(resource).find()) {
-                    sensitivityEntity = map.get(key);
-                    break;
-                }
-            }
-        }
-        Map<String, Object> newEvent = new TreeMap<String, Object>(event);
-        newEvent.put("sensitivityType", sensitivityEntity == null ? "NA" : sensitivityEntity.getSensitivityType());
-        //newEvent.put("jobId", resource);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("After oozie resource sensitivity lookup: " + newEvent);
-        }
-        LOG.info("After oozie resource sensitivity lookup: " + newEvent);
-        outputCollector.collect(new Tuple2(newEvent.get("user"), newEvent));
-    }
-}


Mime
View raw message