Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 27676200B50 for ; Sat, 13 Aug 2016 08:46:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 25FAC160A8B; Sat, 13 Aug 2016 06:46:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9B8F5160A8A for ; Sat, 13 Aug 2016 08:46:27 +0200 (CEST) Received: (qmail 97112 invoked by uid 500); 13 Aug 2016 06:46:26 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 97103 invoked by uid 99); 13 Aug 2016 06:46:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 13 Aug 2016 06:46:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 61FEF1A534C for ; Sat, 13 Aug 2016 06:46:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id kNOncEj0ga-8 for ; Sat, 13 Aug 2016 06:46:22 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id AD3555F1E3 for ; Sat, 13 Aug 2016 06:46:20 +0000 (UTC) Received: (qmail 97065 invoked by uid 99); 13 Aug 2016 06:46:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 13 Aug 2016 06:46:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8E60E0579; Sat, 13 Aug 2016 06:46:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yonzhang2012@apache.org To: commits@eagle.incubator.apache.org Date: Sat, 13 Aug 2016 06:46:19 -0000 Message-Id: <06cca014324c474584df743a34612792@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-eagle git commit: EAGLE-444 convert eagle-gc app to use new app framework convert eagle-gc app to use new app framework archived-at: Sat, 13 Aug 2016 06:46:29 -0000 Repository: incubator-eagle Updated Branches: refs/heads/develop 15e1c8335 -> 984586580 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java deleted file mode 100644 index 3401b3c..0000000 --- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.security.topo; - -import backtype.storm.spout.Scheme; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.topology.base.BaseRichSpout; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaSpout; -import storm.kafka.SpoutConfig; -import storm.kafka.ZkHosts; - -import java.util.Arrays; - -/** - * Since 6/8/16. - */ -public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider { - private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class); - - private String configPrefix = "dataSourceConfig"; - - public NewKafkaSourcedSpoutProvider(){} - - public NewKafkaSourcedSpoutProvider(String prefix){ - this.configPrefix = prefix; - } - - @Override - public BaseRichSpout getSpout(Config config){ - Config context = config; - if(this.configPrefix!=null) context = config.getConfig(configPrefix); - // Kafka topic - String topic = context.getString("topic"); - // Kafka consumer group id - String groupId = context.getString("consumerGroupId"); - // Kafka fetch size - int fetchSize = context.getInt("fetchSize"); - // Kafka broker zk connection - String zkConnString = context.getString("zkConnection"); - // transaction zkRoot - String zkRoot = context.getString("transactionZKRoot"); - - LOG.info(String.format("Use topic id: %s",topic)); - - String brokerZkPath = null; - if(context.hasPath("brokerZkPath")) { - brokerZkPath = context.getString("brokerZkPath"); - } - - BrokerHosts hosts; - if(brokerZkPath == null) { - hosts = new ZkHosts(zkConnString); - } else { - hosts = new ZkHosts(zkConnString, brokerZkPath); - } - - SpoutConfig spoutConfig = new SpoutConfig(hosts, - topic, - zkRoot + "/" + topic, - groupId); - - // transaction zkServers - spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(",")); - // transaction zkPort - spoutConfig.zkPort = context.getInt("transactionZKPort"); - // transaction update interval - spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS"); - // Kafka fetch size - spoutConfig.fetchSizeBytes = fetchSize; - // "startOffsetTime" is for test usage, prod should not use this - if (context.hasPath("startOffsetTime")) { - spoutConfig.startOffsetTime = context.getInt("startOffsetTime"); - } - // "forceFromStart" is for test usage, prod should not use this - if (context.hasPath("forceFromStart")) { - spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - } - - if (context.hasPath("schemeCls")) { - try { - Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance(); - spoutConfig.scheme = new SchemeAsMultiScheme(s); - }catch(Exception ex){ - LOG.error("error instantiating scheme object"); - throw new IllegalStateException(ex); - } - }else{ - String err = "schemeCls must be present"; - LOG.error(err); - throw new IllegalStateException(err); - } - return new KafkaSpout(spoutConfig); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java index 2cfee10..d6e319a 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java @@ -19,14 +19,7 @@ package org.apache.eagle.security.hbase; -import com.google.inject.AbstractModule; import org.apache.eagle.app.spi.AbstractApplicationProvider; -import org.apache.eagle.common.module.ModuleRegistry; -import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; -import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore; -import org.apache.eagle.security.service.ISecurityMetadataDAO; -import org.apache.eagle.security.service.InMemMetadataDaoImpl; -import org.apache.eagle.security.service.JDBCSecurityMetadataDAO; /** * Since 8/5/16. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java index 030212a..432043f 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java @@ -26,8 +26,7 @@ import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.sink.StormStreamSink; -import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider; -import storm.kafka.StringScheme; +import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; /** * Since 7/27/16. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java index ccbce98..a4aba68 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java @@ -26,7 +26,6 @@ import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import org.apache.commons.lang3.time.DateUtils; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; @@ -39,7 +38,7 @@ import org.apache.eagle.partition.PartitionStrategy; import org.apache.eagle.partition.PartitionStrategyImpl; import org.apache.eagle.security.partition.DataDistributionDaoImpl; import org.apache.eagle.security.partition.GreedyPartitionAlgorithm; -import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; import storm.kafka.StringScheme; /** http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java b/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java index c6f2c1c..2e597fb 100644 --- a/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java +++ b/eagle-security/eagle-security-hdfs-authlog/src/main/java/org/apache/eagle/security/securitylog/HdfsAuthLogMonitoringMain.java @@ -21,10 +21,9 @@ import backtype.storm.generated.StormTopology; import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; import org.apache.eagle.security.topo.TopologySubmitter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/pom.xml b/eagle-security/eagle-security-oozie-auditlog/pom.xml index d1557ac..bab7e18 100644 --- a/eagle-security/eagle-security-oozie-auditlog/pom.xml +++ b/eagle-security/eagle-security-oozie-auditlog/pom.xml @@ -34,5 +34,10 @@ eagle-security-common ${project.version} + + org.apache.eagle + eagle-app-base + ${project.version} + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java new file mode 100644 index 0000000..361f8df --- /dev/null +++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogApplication.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eagle.security.oozie.parse; + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.app.StormApplication; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinBolt; +import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; + +/** + * Since 8/12/16. + */ +public class OozieAuditLogApplication extends StormApplication { + public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + public final static String PARSER_TASK_NUM = "topology.numOfParserTasks"; + public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks"; + public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; + + @Override + public StormTopology execute(Config config, StormEnvironment environment) { + TopologyBuilder builder = new TopologyBuilder(); + NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider(); + IRichSpout spout = provider.getSpout(config); + + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfParserTask = config.getInt(PARSER_TASK_NUM); + int numOfJoinTasks = config.getInt(JOIN_TASK_NUM); + int numOfSinkTasks = config.getInt(SINK_TASK_NUM); + + builder.setSpout("ingest", spout, numOfSpoutTasks); + + OozieAuditLogParserBolt parserBolt = new OozieAuditLogParserBolt(); + BoltDeclarer parserBoltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTask); + parserBoltDeclarer.fieldsGrouping("ingest", new Fields("f1")); + + + OozieResourceSensitivityDataJoinBolt joinBolt = new OozieResourceSensitivityDataJoinBolt(config); + BoltDeclarer boltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks); + boltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); + + StormStreamSink sinkBolt = environment.getStreamSink("oozie_audit_log_stream",config); + BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks); + kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user")); + return builder.createTopology(); + } + + public static void main(String[] args){ + Config config = ConfigFactory.load(); + OozieAuditLogApplication app = new OozieAuditLogApplication(); + app.run(config); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java new file mode 100644 index 0000000..13f5e8c --- /dev/null +++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogParserBolt.java @@ -0,0 +1,86 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * + */ + +package org.apache.eagle.security.oozie.parse; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; + +/** + * Since 8/12/16. + */ +public class OozieAuditLogParserBolt extends BaseRichBolt{ + private static Logger LOG = LoggerFactory.getLogger(OozieAuditLogParserBolt.class); + private OutputCollector collector; + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String logLine = new String(input.getString(0)); + + try { + OozieAuditLogParser parser = new OozieAuditLogParser(); + OozieAuditLogObject entity = null; + try { + entity = parser.parse(logLine); + } catch (Exception ex) { + LOG.error("Failing oozie parse audit log message", ex); + } + Map map = new TreeMap(); + map.put("timestamp", entity.timestamp); + map.put("level", entity.level); + map.put("ip", entity.ip); + map.put("user", entity.user); + map.put("group", entity.group); + map.put("app", entity.app); + map.put("jobId", entity.jobId); + map.put("operation", entity.operation); + map.put("parameter", entity.parameter); + map.put("status", entity.status); + map.put("httpcode", entity.httpcode); + map.put("errorcode", entity.errorcode); + map.put("errormessage", entity.errormessage); + collector.emit(Arrays.asList(map)); + }catch(Exception ex){ + LOG.error("error in parsing oozie audit log", ex); + }finally { + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("f1")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java deleted file mode 100644 index 85dfa74..0000000 --- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/OozieAuditLogProcessorMain.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.security.oozie.parse; - -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.security.oozie.parse.sensitivity.OozieResourceSensitivityDataJoinExecutor; - -public class OozieAuditLogProcessorMain { - public static void main(String[] args) throws Exception { - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); - env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer") - .flatMap(new OozieResourceSensitivityDataJoinExecutor()) - .alertWithConsumer("oozieSecurityLogEventStream", "oozieAuditLogAlertExecutor"); - env.execute(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java new file mode 100644 index 0000000..740cea0 --- /dev/null +++ b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinBolt.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.eagle.security.oozie.parse.sensitivity; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.eagle.datastream.Collector; +import org.apache.eagle.security.entity.OozieResourceSensitivityAPIEntity; +import org.apache.eagle.security.util.ExternalDataCache; +import org.apache.eagle.security.util.ExternalDataJoiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.regex.Pattern; + +public class OozieResourceSensitivityDataJoinBolt extends BaseRichBolt { + private final static Logger LOG = LoggerFactory.getLogger(OozieResourceSensitivityDataJoinBolt.class); + private Config config; + private OutputCollector collector; + + public OozieResourceSensitivityDataJoinBolt(Config config){ + this.config = config; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + // start hbase sensitivity data polling + try { + ExternalDataJoiner joiner = new ExternalDataJoiner( + OozieResourceSensitivityPollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex()); + joiner.start(); + } catch(Exception ex){ + LOG.error("Fail bringing up quartz scheduler.", ex); + throw new IllegalStateException(ex); + } + } + + @Override + public void execute(Tuple input) { + try { + @SuppressWarnings("unchecked") + Map event = (Map) input.getValue(0); + @SuppressWarnings("unchecked") + Map map = + (Map) ExternalDataCache + .getInstance() + .getJobResult(OozieResourceSensitivityPollingJob.class); + LOG.info(">>>> event: " + event + " >>>> map: " + map); + + String resource = (String) event.get("jobId"); + + OozieResourceSensitivityAPIEntity sensitivityEntity = null; + + if (map != null && resource != "") { + for (String key : map.keySet()) { + Pattern pattern = Pattern.compile(key, Pattern.CASE_INSENSITIVE); + if (pattern.matcher(resource).find()) { + sensitivityEntity = map.get(key); + break; + } + } + } + Map newEvent = new TreeMap(event); + newEvent.put("sensitivityType", sensitivityEntity == null ? "NA" : sensitivityEntity.getSensitivityType()); + //newEvent.put("jobId", resource); + if (LOG.isDebugEnabled()) { + LOG.debug("After oozie resource sensitivity lookup: " + newEvent); + } + LOG.info("After oozie resource sensitivity lookup: " + newEvent); + collector.emit(Arrays.asList(newEvent.get("user"), newEvent)); + }catch(Exception ex){ + LOG.error("error join external sensitivity data", ex); + }finally { + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("user", "message")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java deleted file mode 100644 index 829ad79..0000000 --- a/eagle-security/eagle-security-oozie-auditlog/src/main/java/org/apache/eagle/security/oozie/parse/sensitivity/OozieResourceSensitivityDataJoinExecutor.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.security.oozie.parse.sensitivity; - -import com.typesafe.config.Config; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.security.entity.OozieResourceSensitivityAPIEntity; -import org.apache.eagle.security.util.ExternalDataCache; -import org.apache.eagle.security.util.ExternalDataJoiner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.regex.Pattern; - -public class OozieResourceSensitivityDataJoinExecutor extends JavaStormStreamExecutor2 { - private final static Logger LOG = LoggerFactory.getLogger(OozieResourceSensitivityDataJoinExecutor.class); - private Config config; - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - // start hive resource data polling - try { - ExternalDataJoiner joiner = new ExternalDataJoiner(OozieResourceSensitivityPollingJob.class, config, "1"); - joiner.start(); - } catch (Exception ex) { - LOG.error("Fail to bring up quartz scheduler.", ex); - throw new IllegalStateException(ex); - } - } - - @Override - public void flatMap(List input, Collector> outputCollector) { - @SuppressWarnings("unchecked") - Map event = (Map) input.get(0); - @SuppressWarnings("unchecked") - Map map = - (Map) ExternalDataCache - .getInstance() - .getJobResult(OozieResourceSensitivityPollingJob.class); - LOG.info(">>>> event: " + event + " >>>> map: " + map); - - String resource = (String) event.get("jobId"); - - OozieResourceSensitivityAPIEntity sensitivityEntity = null; - - if (map != null && resource != "") { - for (String key : map.keySet()) { - Pattern pattern = Pattern.compile(key, Pattern.CASE_INSENSITIVE); - if (pattern.matcher(resource).find()) { - sensitivityEntity = map.get(key); - break; - } - } - } - Map newEvent = new TreeMap(event); - newEvent.put("sensitivityType", sensitivityEntity == null ? "NA" : sensitivityEntity.getSensitivityType()); - //newEvent.put("jobId", resource); - if (LOG.isDebugEnabled()) { - LOG.debug("After oozie resource sensitivity lookup: " + newEvent); - } - LOG.info("After oozie resource sensitivity lookup: " + newEvent); - outputCollector.collect(new Tuple2(newEvent.get("user"), newEvent)); - } -}