Return-Path: X-Original-To: apmail-eagle-commits-archive@minotaur.apache.org Delivered-To: apmail-eagle-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4B64018A02 for ; Wed, 16 Dec 2015 06:01:48 +0000 (UTC) Received: (qmail 35007 invoked by uid 500); 16 Dec 2015 06:01:48 -0000 Delivered-To: apmail-eagle-commits-archive@eagle.apache.org Received: (qmail 34979 invoked by uid 500); 16 Dec 2015 06:01:48 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 34970 invoked by uid 99); 16 Dec 2015 06:01:48 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2015 06:01:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 9FE1D1A0681 for ; Wed, 16 Dec 2015 06:01:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.226 X-Spam-Level: * X-Spam-Status: No, score=1.226 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id VCAALhraFcRl for ; Wed, 16 Dec 2015 06:01:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 08B3626ACF for ; Wed, 16 Dec 2015 06:01:44 +0000 (UTC) Received: (qmail 34734 invoked by uid 99); 16 Dec 2015 06:01:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Dec 2015 06:01:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D44E2E0AF9; Wed, 16 Dec 2015 06:01:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 16 Dec 2015 06:01:43 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping Repository: incubator-eagle Updated Branches: refs/heads/master 2734b4222 -> 52b8e58b1 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java index 09e7475..5b1bee6 100644 --- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java +++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java @@ -18,12 +18,8 @@ */ package org.apache.eagle.security.hive.jobrunning; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigRenderOptions; -import org.apache.eagle.dataproc.util.ConfigOptionParser; -import org.apache.eagle.datastream.ExecutionEnvironmentFactory; -import org.apache.eagle.datastream.StormExecutionEnvironment; +import org.apache.eagle.datastream.ExecutionEnvironments; +import org.apache.eagle.datastream.storm.StormExecutionEnvironment; import org.apache.eagle.security.hive.sensitivity.HiveResourceSensitivityDataJoinExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,20 +29,15 @@ public class HiveJobRunningMonitoringMain { private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningMonitoringMain.class); public static void main(String[] args) throws Exception{ - Config config = new ConfigOptionParser().load(args); - - LOG.info("Config class: " + config.getClass().getCanonicalName()); - - if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise())); - + StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); String spoutName = "msgConsumer"; - int parallelism = config.getInt("envContextConfig.parallelismConfig." + spoutName); - StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config); - env.newSource(new HiveJobRunningSourcedStormSpoutProvider().getSpout(config, parallelism)).renameOutputFields(4).withName(spoutName).groupBy(Arrays.asList(0)) + int parallelism = env.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName); + env.fromSpout(new HiveJobRunningSourcedStormSpoutProvider().getSpout(env.getConfig(), parallelism)) + .withOutputFields(4).nameAs(spoutName).groupBy(Arrays.asList(0)) .flatMap(new JobConfigurationAdaptorExecutor()).groupBy(Arrays.asList(0)) .flatMap(new HiveQueryParserExecutor()).groupBy(Arrays.asList(0)) .flatMap(new HiveResourceSensitivityDataJoinExecutor()) .alertWithConsumer("hiveAccessLogStream", "hiveAccessAlertByRunningJob"); env.execute(); } -}; \ No newline at end of file +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java index c95dfe0..1fdb3bd 100644 --- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java +++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java @@ -19,11 +19,9 @@ package org.apache.eagle.security.userprofile; import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigRenderOptions; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.util.ConfigOptionParser; import org.apache.eagle.datastream.*; +import org.apache.eagle.datastream.storm.StormExecutionEnvironment; import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,19 +29,9 @@ import org.slf4j.LoggerFactory; import java.util.Map; public class UserProfileDetectionBatchMain { - private final static Logger LOG = LoggerFactory.getLogger(UserProfileDetectionBatchMain.class); - public static void main(String[] args) throws Exception{ - new ConfigOptionParser().load(args); - System.setProperty("config.trace", "loads"); - Config config = ConfigFactory.load(); - - LOG.info("Config class: " + config.getClass().getCanonicalName()); - - if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise())); - - StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config); - env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1) + StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); + env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1) .flatMap(new UserActivityPartitionExecutor()) .alertWithConsumer(UserProfileDetectionConstants.USER_ACTIVITY_AGGREGATION_STREAM, UserProfileDetectionConstants.USER_PROFILE_ANOMALY_DETECTION_EXECUTOR); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java index caea5e9..1048bdd 100644 --- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java +++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java @@ -19,35 +19,23 @@ package org.apache.eagle.security.userprofile; import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.util.ConfigOptionParser; import org.apache.eagle.datastream.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.eagle.datastream.storm.StormExecutionEnvironment; import java.util.Arrays; import java.util.List; import java.util.Map; public class UserProfileDetectionStreamMain { - private final static Logger LOG = LoggerFactory.getLogger(UserProfileDetectionStreamMain.class); - public static void main(String[] args) throws Exception{ - Config config = new ConfigOptionParser().load(args); - - LOG.info("Config class: " + config.getClass().getCanonicalName()); - - if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise())); - - StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config); - env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1).withName("kafkaMsgConsumer") - .flatMap(new AuditLogTransformer()).withName("transformer") // [user,map] + StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args); + env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer") + .flatMap(new AuditLogTransformer()).nameAs("transformer") // [user,map] .groupBy(Arrays.asList(0)) // group by [user] - .flatMap(new UserProfileAggregatorExecutor()).withName("aggregator") - .alertWithConsumer(Arrays.asList(UserProfileDetectionConstants.USER_ACTIVITY_AGGREGATION_STREAM), + .flatMap(new UserProfileAggregatorExecutor()).nameAs("aggregator") + .alertWithConsumer(UserProfileDetectionConstants.USER_ACTIVITY_AGGREGATION_STREAM, UserProfileDetectionConstants.USER_PROFILE_ANOMALY_DETECTION_EXECUTOR); // alert - ; env.execute(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/test.txt ---------------------------------------------------------------------- diff --git a/test.txt b/test.txt deleted file mode 100644 index 6623e06..0000000 --- a/test.txt +++ /dev/null @@ -1 +0,0 @@ -test for keep file for user command reassembler