eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-446 convert eagle-hive app to use new app framework convert eagle-hive app to use new app framework
Date Sat, 13 Aug 2016 06:11:56 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 502c7e37f -> 15e1c8335


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
deleted file mode 100644
index ad06bd4..0000000
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveQueryParserExecutor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hive.jobrunning;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.hive.ql.HiveQLParserContent;
-import org.apache.eagle.security.hive.ql.Parser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * parse hive query log
- */
-public class HiveQueryParserExecutor extends JavaStormStreamExecutor2<String, Map> {
-	private static final long serialVersionUID = -5878930561335302957L;
-	private static final Logger LOG = LoggerFactory.getLogger(HiveQueryParserExecutor.class);
-
-	private Config config;
-
-    @Override
-	public void prepareConfig(Config config) {
-		this.config = config;
-	}
-
-    @Override
-	public void init(){
-
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
-        /**
-         * hiveQueryLog includes the following key value pair
-         * "hive.current.database" -> <database name>
-         * "hive.query.string" -> <hive query statement>
-         * "mapreduce.job.user.name" -> <user name>
-         * TODO we need hive job start and end time
-         */
-        String user = (String)input.get(0);
-        @SuppressWarnings("unchecked")
-        Map<String, Object> hiveQueryLog = (Map<String, Object>)input.get(1);
-        //if(LOG.isDebugEnabled()) LOG.debug("Receive hive query log: " + hiveQueryLog);
-
-        String query = null;
-        String db = null;
-        String userName = null;
-        long timestamp = -1;
-        for (Entry<String, Object> entry : hiveQueryLog.entrySet()) {
-            switch (entry.getKey()) {
-                case "hive.query.string":
-                    if (entry.getValue() != null) {
-                        query = entry.getValue().toString();
-                    }
-                    break;
-                case "hive.current.database":
-                    if (entry.getValue() != null) {
-                        db = entry.getValue().toString();
-                    }
-                    break;
-                case "mapreduce.job.user.name":
-                    if (entry.getValue() != null) {
-                        userName = entry.getValue().toString();
-                    }
-                    break;
-                case "mapreduce.job.cache.files.timestamps":
-                    if (entry.getValue() != null) {
-                        String timestampString = (String) entry.getValue();
-                        String[] timestampArray = timestampString.split("\\s*,\\s*");
-              /* Get timestamp of start time. */
-                        timestamp = Long.parseLong(timestampArray[0]);
-                    }
-                    break;
-            }
-        }
-
-        HiveQLParserContent parserContent = null;
-        Parser queryParser = new Parser();
-        try {
-            parserContent = queryParser.run(query);
-        } catch (Exception ex) {
-            LOG.error("Failed running hive query parser.", ex);
-            //throw new IllegalStateException(ex);
-        }
-        if(parserContent == null) {
-            LOG.warn("Event ignored as it can't be correctly parsed, the query log is " + query);
-            return;
-        }
-        if(parserContent.getTableColumnMap().size() == 0) {
-            LOG.warn("Unsupported command for parsing " + query);
-            return;
-        }
-        /**
-         * Generate "resource" field: /db/table/column
-         * "resource" -> </db/table/column1,/db/table/column2,...>
-         */
-        StringBuilder resources = new StringBuilder();
-        String prefix = ",";
-        String connector = "/";
-        for (Entry<String, Set<String>> entry : parserContent.getTableColumnMap().entrySet()) {
-            String table = entry.getKey();
-            Set<String> colSet = entry.getValue();
-            /**
-             * If colSet is empty, it means no column is accessed in the table.
-             * So column is not added to the event stream.
-             * Only /db/table
-             */
-            if (colSet.isEmpty()) {
-                resources.append(connector).append(db).append(connector).append(table).append(prefix);
-            } else {
-                for (String col : colSet) {
-                    resources.append(connector).append(db).append(connector).append(table);
-                    if (col != null && col.length() > 0) {
-                        resources.append(connector).append(col);
-                    }
-                    resources.append(prefix);
-                }
-            }
-        }
-        /* Remove the last prefix: "," */
-        resources.setLength(resources.length() - 1);
-
-        /* <event> has to be SortedMap. */
-        Map<String, Object> event = new TreeMap<String, Object>();
-        event.put("user", userName);
-        event.put("command", parserContent.getOperation());
-        event.put("timestamp", timestamp);
-        event.put("resource", resources.toString());
-        LOG.info("HiveQL Parser event stream. " + event);
-
-        outputCollector.collect(new Tuple2(user, event));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
deleted file mode 100644
index 29c4fd2..0000000
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobConfigurationAdaptorExecutor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hive.jobrunning;
-
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.jobrunning.common.JobConstants;
-import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
-import org.apache.eagle.jobrunning.storm.JobRunningContentFilter;
-import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class JobConfigurationAdaptorExecutor extends JavaStormStreamExecutor2<String, Map> {
-	private static final long serialVersionUID = 1L;	
-	private static final Logger LOG = LoggerFactory.getLogger(JobConfigurationAdaptorExecutor.class);
-	private JobRunningContentFilter filter;
-	
-	@Override
-	public void prepareConfig(Config config) {
-	}
-
-	@Override
-	public void init() {
-		filter = new JobRunningContentFilterImpl();
-	}
-	
-	private Map<String, Object> convertMap(Map<String, String> configs) {
-		Map<String, Object> map = new HashMap<String, Object>();
-		for (Entry<String, String> config : configs.entrySet()) {
-			map.put(config.getKey(), config.getValue());
-		}
-		return map;
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
-		String user = (String)input.get(0);
-        String jobId = (String)input.get(1);
-        ResourceType type = (ResourceType)input.get(2);
-        if (type.equals(ResourceType.JOB_CONFIGURATION)) {
-            Map<String, String> configs = (Map<String, String>)input.get(3);
-            if (filter.acceptJobConf(configs)) {
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("Got a hive job, jobID: " + jobId + ", query: " + configs.get(JobConstants.HIVE_QUERY_STRING));
-                } else {
-                    LOG.info("Got a hive job, jobID: " + jobId);
-                }
-
-                Map<String, Object> map = convertMap(configs);
-                outputCollector.collect(new Tuple2(user, map));
-            }
-            else {
-                LOG.info("skip non hive job, jobId: " + jobId);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
new file mode 100644
index 0000000..3f0b95b
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/JobFilterBolt.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.jobrunning;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jobrunning.common.JobConstants;
+import org.apache.eagle.jobrunning.common.JobConstants.ResourceType;
+import org.apache.eagle.jobrunning.storm.JobRunningContentFilter;
+import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class JobFilterBolt extends BaseRichBolt {
+	private static final Logger LOG = LoggerFactory.getLogger(JobFilterBolt.class);
+    private OutputCollector collector;
+	private JobRunningContentFilter filter;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        filter = new JobRunningContentFilterImpl();
+    }
+
+	private Map<String, Object> convertMap(Map<String, String> configs) {
+		Map<String, Object> map = new HashMap<String, Object>();
+		for (Entry<String, String> config : configs.entrySet()) {
+			map.put(config.getKey(), config.getValue());
+		}
+		return map;
+	}
+
+    @Override
+    public void execute(Tuple input) {
+		String user = input.getString(0);
+        String jobId = input.getString(1);
+        ResourceType type = (ResourceType)input.getValue(2);
+        if (type.equals(ResourceType.JOB_CONFIGURATION)) {
+            Map<String, String> configs = (Map<String, String>)input.getValue(3);
+            if (filter.acceptJobConf(configs)) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Got a hive job, jobID: " + jobId + ", query: " + configs.get(JobConstants.HIVE_QUERY_STRING));
+                } else {
+                    LOG.info("Got a hive job, jobID: " + jobId);
+                }
+
+                Map<String, Object> map = convertMap(configs);
+                collector.emit(Arrays.asList(user, map));
+            }
+            else {
+                LOG.info("skip non hive job, jobId: " + jobId);
+            }
+        }
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("user", "message"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java
new file mode 100644
index 0000000..57da65b
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinBolt.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.hive.sensitivity;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.jobrunning.storm.JobRunningContentFilterImpl;
+import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity;
+import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class HiveResourceSensitivityDataJoinBolt extends BaseRichBolt {
+    private final static Logger LOG = LoggerFactory.getLogger(HiveResourceSensitivityDataJoinBolt.class);
+    private OutputCollector collector;
+    private Config config;
+
+    public HiveResourceSensitivityDataJoinBolt(Config config){
+        this.config = config;
+    }
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        // start hive resource data polling
+        try {
+            ExternalDataJoiner joiner = new ExternalDataJoiner(
+                    HiveResourceSensitivityPollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex());
+            joiner.start();
+        } catch(Exception ex){
+            LOG.error("Fail to bring up quartz scheduler.", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String user = input.getString(0);
+        Map<String, Object> event = (Map<String, Object>)input.getValue(1);
+        Map<String, HiveResourceSensitivityAPIEntity> map =
+                (Map<String, HiveResourceSensitivityAPIEntity>) ExternalDataCache
+                        .getInstance()
+                        .getJobResult(HiveResourceSensitivityPollingJob.class);
+
+        String resource = (String)event.get("resource");
+        List<String> resourceList = Arrays.asList(resource.split("\\s*,\\s*"));
+        HiveResourceSensitivityAPIEntity sensitivityEntity = null;
+
+        // Check if hive resource contains sensitive data.
+        for (String s : resourceList) {
+            if (map != null) {
+                sensitivityEntity = null;
+                for (String r : map.keySet()) {
+                    Pattern pattern = Pattern.compile(r,Pattern.CASE_INSENSITIVE);
+                    Matcher matcher = pattern.matcher(s);
+                    boolean isMatched = matcher.matches();
+                    if (isMatched) {
+                        sensitivityEntity = map.get(r);
+                        break;
+                    }
+                }
+            }
+            Map<String, Object> newEvent = new TreeMap<String, Object>(event);
+            newEvent.put("sensitivityType", sensitivityEntity  == null ?
+                    "NA" : sensitivityEntity.getSensitivityType());
+            newEvent.put("resource", s);
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("After hive resource sensitivity lookup: " + newEvent);
+            }
+            LOG.info("After hive resource sensitivity lookup: " + newEvent);
+            collector.emit(Arrays.asList(user, newEvent));
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("user", "message"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
deleted file mode 100644
index d26abea..0000000
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hive.sensitivity;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class HiveResourceSensitivityDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> {
-    private final static Logger LOG = LoggerFactory.getLogger(
-            HiveResourceSensitivityDataJoinExecutor.class);
-    private Config config;
-    
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;       
-    }
-
-    @Override
-    public void init() {
-        // start hive resource data polling
-        try {
-            ExternalDataJoiner joiner = new ExternalDataJoiner(
-                    HiveResourceSensitivityPollingJob.class, config, "1");
-            joiner.start();
-        } catch(Exception ex){
-            LOG.error("Fail to bring up quartz scheduler.", ex);
-            throw new IllegalStateException(ex);
-        }
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
-        String user = (String)input.get(0);
-        Map<String, Object> event = (Map<String, Object>)input.get(1);
-        Map<String, HiveResourceSensitivityAPIEntity> map =
-                (Map<String, HiveResourceSensitivityAPIEntity>) ExternalDataCache
-                        .getInstance()
-                        .getJobResult(HiveResourceSensitivityPollingJob.class);
-
-        String resource = (String)event.get("resource");
-        List<String> resourceList = Arrays.asList(resource.split("\\s*,\\s*"));
-        HiveResourceSensitivityAPIEntity sensitivityEntity = null;
-
-        // Check if hive resource contains sensitive data.
-        for (String s : resourceList) {
-            if (map != null) {
-                sensitivityEntity = null;
-                for (String r : map.keySet()) {
-                    Pattern pattern = Pattern.compile(r,Pattern.CASE_INSENSITIVE);
-                    Matcher matcher = pattern.matcher(s);
-                    boolean isMatched = matcher.matches();
-                    if (isMatched) {
-                        sensitivityEntity = map.get(r);
-                        break;
-                    }
-                }
-            }
-            Map<String, Object> newEvent = new TreeMap<String, Object>(event);
-            newEvent.put("sensitivityType", sensitivityEntity  == null ?
-                    "NA" : sensitivityEntity.getSensitivityType());
-            newEvent.put("resource", s);
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("After hive resource sensitivity lookup: " + newEvent);
-            }
-            LOG.info("After hive resource sensitivity lookup: " + newEvent);
-            outputCollector.collect(new Tuple2(
-                    user,
-                    newEvent));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
index b7d9e9c..061ef19 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
@@ -20,6 +20,9 @@ import com.google.common.base.Function;
 import com.google.common.collect.Maps;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.security.service.HiveSensitivityEntity;
+import org.apache.eagle.security.service.IMetadataServiceClient;
+import org.apache.eagle.security.service.MetadataServiceClientImpl;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
@@ -31,6 +34,7 @@ import org.quartz.JobExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -43,18 +47,18 @@ public class HiveResourceSensitivityPollingJob implements Job {
             throws JobExecutionException {
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
         try {
-            List<HiveResourceSensitivityAPIEntity>
+            Collection<HiveSensitivityEntity>
             hiveResourceSensitivity = load(jobDataMap);
             if(hiveResourceSensitivity == null) {
             	LOG.warn("Hive resource sensitivity information is empty");
             	return;
             }
-            Map<String, HiveResourceSensitivityAPIEntity> map = Maps.uniqueIndex(
+            Map<String, HiveSensitivityEntity> map = Maps.uniqueIndex(
             		hiveResourceSensitivity,
-            		new Function<HiveResourceSensitivityAPIEntity, String>() {
+            		new Function<HiveSensitivityEntity, String>() {
             			@Override
-            			public String apply(HiveResourceSensitivityAPIEntity input) {
-            				return input.getTags().get("hiveResource");
+            			public String apply(HiveSensitivityEntity input) {
+            				return input.getHiveResource();
             			}
             		});
             ExternalDataCache.getInstance().setJobResult(getClass(), map);
@@ -63,7 +67,7 @@ public class HiveResourceSensitivityPollingJob implements Job {
         }
     }
 
-    private List<HiveResourceSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception {
+    private Collection<HiveSensitivityEntity> load(JobDataMap jobDataMap) throws Exception {
         Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
         String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
         Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
@@ -74,13 +78,7 @@ public class HiveResourceSensitivityPollingJob implements Job {
         LOG.info("Load hive resource sensitivity information from eagle service "
             + eagleServiceHost + ":" + eagleServicePort);
 
-        IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
-        String query = "HiveResourceSensitivityService[]{*}";
-        GenericServiceAPIResponseEntity<HiveResourceSensitivityAPIEntity> response =
-                client.search().pageSize(Integer.MAX_VALUE).query(query).send();
-        client.close();
-        if (response.getException() != null)
-            throw new IllegalStateException(response.getException());
-        return response.getObj();
+        IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+        return client.listHiveSensitivities();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml b/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml
new file mode 100644
index 0000000..931bee6
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HiveQueryMonitoringAppProvider.xml
@@ -0,0 +1,218 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<application>
+    <type>HiveQueryMonitoringApplication</type>
+    <name>Hdfs Audit Log Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.security.auditlog.HdfsAuditLogApplication</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>dataSourceConfig.zkQuorum</name>
+            <displayName>dataSourceConfig.zkQuorum</displayName>
+            <value>server.eagle.apache.org:2181</value>
+            <description>zookeeper quorum for storing hive job processing status</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRoot</name>
+            <displayName>dataSourceConfig.zkRoot</displayName>
+            <value>/jobrunning</value>
+            <description>zookeeper znode path for storing hive job processing status</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkSessionTimeoutMs</name>
+            <displayName>dataSourceConfig.zkSessionTimeoutMs</displayName>
+            <value>15000</value>
+            <description>zk connection timeout in milliseconds</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRetryTimes</name>
+            <displayName>dataSourceConfig.zkRetryTimes</displayName>
+            <value>3</value>
+            <description>retry times when zookeeper fails</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkRetryInterval</name>
+            <displayName>dataSourceConfig.zkRetryInterval</displayName>
+            <value>2000</value>
+            <description>interval for retrying when zookeeper fails</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.RMEndPoints</name>
+            <displayName>dataSourceConfig.RMEndPoints</displayName>
+            <value>http://server.eagle.apache.org:8088/</value>
+            <description>resource manager endpoint</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.HSEndPoint</name>
+            <displayName>dataSourceConfig.HSEndPoint</displayName>
+            <value>http://server.eagle.apache.org:19888/</value>
+            <description>history server endpoint</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.partitionerCls</name>
+            <displayName>dataSourceConfig.partitionerCls</displayName>
+            <value>org.apache.eagle.job.DefaultJobPartitionerImpl</value>
+            <description>partition class for job</description>
+        </property>
+        <property>
+            <name>topology.numOfSpoutTasks</name>
+            <displayName>topology.numOfSpoutTasks</displayName>
+            <value>2</value>
+            <description>number of spout tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfParserTasks</name>
+            <displayName>topology.numOfParserTasks</displayName>
+            <value>2</value>
+            <description>number of parser tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfJoinTasks</name>
+            <displayName>topology.numOfJoinTasks</displayName>
+            <value>2</value>
+            <description>number of external join tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfSinkTasks</name>
+            <displayName>topology.numOfSinkTasks</displayName>
+            <value>2</value>
+            <description>number of sink tasks</description>
+        </property>
+        <property>
+            <name>eagleProps.dataJoinPollIntervalSec</name>
+            <displayName>eagleProps.dataJoinPollIntervalSec</displayName>
+            <value>30</value>
+            <description>interval in seconds for polling</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <displayName>eagleProps.eagleService.host</displayName>
+            <value>localhost</value>
+            <description>eagle service host</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <displayName>eagleProps.eagleService.port</displayName>
+            <value>8080</value>
+            <description>eagle service port</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <displayName>eagleProps.eagleService.username</displayName>
+            <value>admin</value>
+            <description>eagle service username</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <displayName>eagleProps.eagleService.password</displayName>
+            <value>secret</value>
+            <description>eagle service password</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.topic</name>
+            <displayName>dataSinkConfig.topic</displayName>
+            <value>hive_query_parsed</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>server.eagle.apache.org:6667</value>
+            <description>kafka broker list</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
+
+        <!-- properties for hdfs file system access and attribute resolver-->
+        <property>
+            <name>fs.defaultFS</name>
+            <displayName>fs.defaultFS</displayName>
+            <value>hdfs://server.eagle.apache.org:8020</value>
+            <description>hdfs endpoint</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>hdfs_audit_log_stream</streamId>
+            <description>Hdfs Audit Log Stream</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>action</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>status</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..fdd2754
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.security.hive.jobrunning.HiveQueryMonitoringAppProvider

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-security/eagle-security-hive/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/resources/application.conf b/eagle-security/eagle-security-hive/src/main/resources/application.conf
index 22be461..f21b4a0 100644
--- a/eagle-security/eagle-security-hive/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hive/src/main/resources/application.conf
@@ -14,50 +14,39 @@
 # limitations under the License.
 
 {
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "hiveQueryRunningTopology",
-    "stormConfigFile" : "hive.storm.yaml",
-    "parallelismConfig" : {
-      "msgConsumer" : 1
-    }
+  "appId" : "HiveQueryMonitoringApp",
+  "mode" : "LOCAL",
+  "siteId" : "testsite",
+  "topology" : {
+    "numOfSpoutTasks" : 2,
+    "numOfFilterTasks" : 2,
+    "numOfParserTasks" : 2,
+    "numOfJoinTasks" : 2,
+    "numOfSinkTasks" : 2
   },
   "dataSourceConfig": {
-    "flavor" : "stormrunning",
-    "zkQuorum" : "localhost:2181",
+    "zkQuorum" : "server.eagle.apache.org:2181",
     "zkRoot" : "/jobrunning",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 2000,
-    "RMEndPoints" : "http://localhost:8088/",
-    "HSEndPoint" : "http://localhost:19888/",
+    "RMEndPoints" : "http://server.eagle.apache.org:8088/",
+    "HSEndPoint" : "http://server.eagle.apache.org:19888/",
     "partitionerCls" : "org.apache.eagle.job.DefaultJobPartitionerImpl",
   },
   "eagleProps" : {
-    "site" : "sandbox",
-    "application" : "hiveQueryLog",
-    "mailHost" : "mailHost.com",
-    "mailSmtpPort":"25",
-    "mailDebug" : "true",
+    "dataJoinPollIntervalSec" : 30,
     "eagleService": {
       "host": "localhost",
-      "port": 38080,
+      "port": 9090,
       "username": "admin",
       "password": "secret"
     }
   },
-  "alertExecutorConfigs" : {
-    "hiveAccessAlertByRunningJob" : {
-      "parallelism" : 1,
-      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
-      "needValidation" : "true"
-    }
-  },
-  "dynamicConfigSource" : {
-    "enabled" : true,
-    "initDelayMillis" : 0,
-    "delayMillis" : 30000,
-    "ignoreDeleteFromSource" : true
+  "dataSinkConfig": {
+    "topic" : "hive_query_parsed",
+    "brokerList" : "server.eagle.apache.org:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-server-assembly/src/main/conf/configuration.yml
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/configuration.yml b/eagle-server-assembly/src/main/conf/configuration.yml
new file mode 100644
index 0000000..c671ade
--- /dev/null
+++ b/eagle-server-assembly/src/main/conf/configuration.yml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server:
+  applicationConnectors:
+    - type: http
+      port: 9090
+  adminConnectors:
+    - type: http
+      port: 9091

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-server-assembly/src/main/conf/configuration.yml~HEAD
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/configuration.yml~HEAD b/eagle-server-assembly/src/main/conf/configuration.yml~HEAD
deleted file mode 100644
index c671ade..0000000
--- a/eagle-server-assembly/src/main/conf/configuration.yml~HEAD
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-server:
-  applicationConnectors:
-    - type: http
-      port: 9090
-  adminConnectors:
-    - type: http
-      port: 9091

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/15e1c833/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop b/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop
deleted file mode 100644
index c671ade..0000000
--- a/eagle-server-assembly/src/main/conf/configuration.yml~upstream_develop
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-server:
-  applicationConnectors:
-    - type: http
-      port: 9090
-  adminConnectors:
-    - type: http
-      port: 9091



Mime
View raw message