eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [1/5] incubator-eagle git commit: EAGLE-66 Typesafe Streaming DSL and KeyValue based Grouping
Date Wed, 16 Dec 2015 06:01:43 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 2734b4222 -> 52b8e58b1


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java
b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java
index 09e7475..5b1bee6 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningMonitoringMain.java
@@ -18,12 +18,8 @@
  */
 package org.apache.eagle.security.hive.jobrunning;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
-import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
-import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 import org.apache.eagle.security.hive.sensitivity.HiveResourceSensitivityDataJoinExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,20 +29,15 @@ public class HiveJobRunningMonitoringMain {
 	private static final Logger LOG = LoggerFactory.getLogger(HiveJobRunningMonitoringMain.class);
 
 	public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
         String spoutName = "msgConsumer";
-        int parallelism = config.getInt("envContextConfig.parallelismConfig." + spoutName);
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        env.newSource(new HiveJobRunningSourcedStormSpoutProvider().getSpout(config, parallelism)).renameOutputFields(4).withName(spoutName).groupBy(Arrays.asList(0))
+        int parallelism = env.getConfig().getInt("envContextConfig.parallelismConfig." +
spoutName);
+        env.fromSpout(new HiveJobRunningSourcedStormSpoutProvider().getSpout(env.getConfig(),
parallelism))
+                .withOutputFields(4).nameAs(spoutName).groupBy(Arrays.asList(0))
                 .flatMap(new JobConfigurationAdaptorExecutor()).groupBy(Arrays.asList(0))
                 .flatMap(new HiveQueryParserExecutor()).groupBy(Arrays.asList(0))
                 .flatMap(new HiveResourceSensitivityDataJoinExecutor())
                 .alertWithConsumer("hiveAccessLogStream", "hiveAccessAlertByRunningJob");
         env.execute();
 	}
-};
\ No newline at end of file
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
index c95dfe0..1fdb3bd 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionBatchMain.java
@@ -19,11 +19,9 @@
 package org.apache.eagle.security.userprofile;
 
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
 import org.apache.eagle.datastream.*;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,19 +29,9 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 
 public class UserProfileDetectionBatchMain {
-    private final static Logger LOG = LoggerFactory.getLogger(UserProfileDetectionBatchMain.class);
-
     public static void main(String[] args) throws Exception{
-        new ConfigOptionParser().load(args);
-        System.setProperty("config.trace", "loads");
-        Config config = ConfigFactory.load();
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1)
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+        env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1)
                 .flatMap(new UserActivityPartitionExecutor())
                 .alertWithConsumer(UserProfileDetectionConstants.USER_ACTIVITY_AGGREGATION_STREAM,
                         UserProfileDetectionConstants.USER_PROFILE_ANOMALY_DETECTION_EXECUTOR);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
index caea5e9..1048bdd 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileDetectionStreamMain.java
@@ -19,35 +19,23 @@
 package org.apache.eagle.security.userprofile;
 
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
 import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.util.ConfigOptionParser;
 import org.apache.eagle.datastream.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
 public class UserProfileDetectionStreamMain {
-    private final static Logger LOG = LoggerFactory.getLogger(UserProfileDetectionStreamMain.class);
-
     public static void main(String[] args) throws Exception{
-        Config config = new ConfigOptionParser().load(args);
-
-        LOG.info("Config class: " + config.getClass().getCanonicalName());
-
-        if(LOG.isDebugEnabled()) LOG.debug("Config content:"+config.root().render(ConfigRenderOptions.concise()));
-
-        StormExecutionEnvironment env = ExecutionEnvironmentFactory.getStorm(config);
-        env.newSource(new KafkaSourcedSpoutProvider().getSpout(config)).renameOutputFields(1).withName("kafkaMsgConsumer")
-                .flatMap(new AuditLogTransformer()).withName("transformer")     // [user,map]
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
+        env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
+                .flatMap(new AuditLogTransformer()).nameAs("transformer")     // [user,map]
                 .groupBy(Arrays.asList(0))                                      // group
by [user]
-                .flatMap(new UserProfileAggregatorExecutor()).withName("aggregator")
-                .alertWithConsumer(Arrays.asList(UserProfileDetectionConstants.USER_ACTIVITY_AGGREGATION_STREAM),
+                .flatMap(new UserProfileAggregatorExecutor()).nameAs("aggregator")
+                .alertWithConsumer(UserProfileDetectionConstants.USER_ACTIVITY_AGGREGATION_STREAM,
                         UserProfileDetectionConstants.USER_PROFILE_ANOMALY_DETECTION_EXECUTOR);
// alert
-                ;
         env.execute();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/52b8e58b/test.txt
----------------------------------------------------------------------
diff --git a/test.txt b/test.txt
deleted file mode 100644
index 6623e06..0000000
--- a/test.txt
+++ /dev/null
@@ -1 +0,0 @@
-test for keep file for user command reassembler


Mime
View raw message