eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [52/55] [abbrv] incubator-eagle git commit: [EAGLE-46] Merge HDFSAuditLogParser changes
Date Thu, 19 Nov 2015 10:47:59 GMT
[EAGLE-46] Merge HDFSAuditLogParser changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e05fac6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e05fac6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e05fac6d

Branch: refs/heads/master
Commit: e05fac6d5c4140b3cd0b1e1c34b6b1154348cbb6
Parents: afe8683 5f618fa
Author: Hao Chen <hao@apache.org>
Authored: Thu Nov 19 16:52:17 2015 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Thu Nov 19 16:52:17 2015 +0800

----------------------------------------------------------------------
 .../bin/hdfs-securitylog-metadata-create.sh     | 40 ++++++++++
 .../conf/sandbox-hdfsAuditLog-application.conf  |  1 +
 .../sandbox-hdfsSecurityLog-application.conf    | 51 ++++++++++++
 .../storm/kafka/KafkaSourcedSpoutProvider.java  | 15 +++-
 .../datastream/StormTopologyExecutorImpl.scala  | 11 ++-
 .../java/eagle/security/util/LogParseUtil.java  | 65 ++++++++++++++++
 .../eagle/security/hdfs/HDFSAuditLogParser.java | 44 +----------
 .../eagle-security-hdfs-securitylog/pom.xml     | 41 ++++++++++
 .../HDFSSecurityLogKafkaDeserializer.java       | 63 +++++++++++++++
 .../HDFSSecurityLogProcessorMain.java           | 50 ++++++++++++
 .../parse/HDFSSecurityLogObject.java            | 27 +++++++
 .../parse/HDFSSecurityLogParser.java            | 82 ++++++++++++++++++++
 .../src/main/resources/application.conf         | 50 ++++++++++++
 .../security/TestHDFSSecuritylogParser.java     | 41 ++++++++++
 eagle-security/pom.xml                          |  1 +
 eagle-topology-assembly/pom.xml                 |  5 ++
 16 files changed, 540 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e05fac6d/eagle-assembly/src/main/conf/sandbox-hdfsAuditLog-application.conf
----------------------------------------------------------------------
diff --cc eagle-assembly/src/main/conf/sandbox-hdfsAuditLog-application.conf
index 04a0466,2cf18fa..1d15007
--- a/eagle-assembly/src/main/conf/sandbox-hdfsAuditLog-application.conf
+++ b/eagle-assembly/src/main/conf/sandbox-hdfsAuditLog-application.conf
@@@ -12,9 -12,10 +12,10 @@@
    "dataSourceConfig": {
      "topic" : "sandbox_hdfs_audit_log",
      "zkConnection" : "127.0.0.1:2181",
+     "brokerZkPath" : "/brokers",
      "zkConnectionTimeoutMS" : 15000,
      "fetchSize" : 1048586,
 -    "deserializerClass" : "eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
 +    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
      "transactionZKServers" : "127.0.0.1",
      "transactionZKPort" : 2181,
      "transactionZKRoot" : "/consumers",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e05fac6d/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --cc eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index a7d7079,0000000..06d37ef
mode 100644,000000..100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@@ -1,88 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.eagle.dataproc.impl.storm.kafka;
 +
 +import java.util.Arrays;
 +
 +import com.typesafe.config.Config;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import storm.kafka.BrokerHosts;
 +import storm.kafka.KafkaSpout;
 +import storm.kafka.SpoutConfig;
 +import storm.kafka.ZkHosts;
 +import backtype.storm.spout.SchemeAsMultiScheme;
 +import backtype.storm.topology.base.BaseRichSpout;
 +
 +import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider;
 +
 +public class KafkaSourcedSpoutProvider extends AbstractStormSpoutProvider{
 +    private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
 +
 +	public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
 +		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
 +	}
 +
 +	@Override
 +	public BaseRichSpout getSpout(Config context){
 +		// Kafka topic
 +		String topic = context.getString("dataSourceConfig.topic");
 +		// Kafka consumer group id
 +		String groupId = context.getString("dataSourceConfig.consumerGroupId");
 +		// Kafka fetch size
 +		int fetchSize = context.getInt("dataSourceConfig.fetchSize");
 +		// Kafka deserializer class
 +		String deserClsName = context.getString("dataSourceConfig.deserializerClass");
 +		// Kafka broker zk connection
 +		String zkConnString = context.getString("dataSourceConfig.zkConnection");
 +		// transaction zkRoot
 +		String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
 +		// Site
 +		String site = context.getString("eagleProps.site");
 +
 +        //String realTopic = (site ==null)? topic : String.format("%s_%s",site,topic);
 +
 +        LOG.info(String.format("Use topic id: %s",topic));
- 		
- 		BrokerHosts hosts = new ZkHosts(zkConnString);
++
++        String brokerZkPath = null;
++        if(context.hasPath("dataSourceConfig.brokerZkPath")) {
++            brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
++        }
++
++        BrokerHosts hosts;
++        if(brokerZkPath == null) {
++            hosts = new ZkHosts(zkConnString);
++        } else {
++            hosts = new ZkHosts(zkConnString, brokerZkPath);
++        }
++        
 +		SpoutConfig spoutConfig = new SpoutConfig(hosts, 
 +				topic,
 +				zkRoot + "/" + topic,
 +				groupId);
 +		
 +		// transaction zkServers
 +		spoutConfig.zkServers = Arrays.asList(context.getString("dataSourceConfig.transactionZKServers").split(","));
 +		// transaction zkPort
 +		spoutConfig.zkPort = context.getInt("dataSourceConfig.transactionZKPort");
 +		// transaction update interval
 +		spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
 +		// Kafka fetch size
 +		spoutConfig.fetchSizeBytes = fetchSize;		
 +		// "startOffsetTime" is for test usage, prod should not use this
 +		if (context.hasPath("dataSourceConfig.startOffsetTime")) {
 +			spoutConfig.startOffsetTime = context.getInt("dataSourceConfig.startOffsetTime");
 +		}		
 +		// "forceFromStart" is for test usage, prod should not use this 
 +		if (context.hasPath("dataSourceConfig.forceFromStart")) {
 +			spoutConfig.forceFromStart = context.getBoolean("dataSourceConfig.forceFromStart");
 +		}
 +		
 +		spoutConfig.scheme = getStreamScheme(deserClsName, context);
 +		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
 +		return kafkaSpout;
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e05fac6d/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --cc eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
index 4cecb85,0000000..737fe44
mode 100644,000000..100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
@@@ -1,47 -1,0 +1,52 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.eagle.datastream
 +
 +import backtype.storm.generated.StormTopology
 +import backtype.storm.utils.Utils
 +import backtype.storm.{Config, LocalCluster, StormSubmitter}
 +import storm.trident.spout.RichSpoutBatchExecutor
 +
 +case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config)
extends AbstractTopologyExecutor {
 +  @throws(classOf[Exception])
 +  def execute {
 +    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
 +    val conf: Config = new Config
 +    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
 +    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
 +    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
 +    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
 +    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
 +
 +    val topologyName = config.getString("envContextConfig.topologyName")
 +    if (!localMode) {
 +      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
 +    }
 +    else {
 +      val cluster: LocalCluster = new LocalCluster
 +      cluster.submitTopology(topologyName, conf, topology)
-       Utils.sleep(Integer.MAX_VALUE)
-       cluster.killTopology(topologyName)
-       cluster.shutdown
++      while(true) {
++        try {
++          Utils.sleep(Integer.MAX_VALUE)
++        }
++        catch {
++          case _ => () // Do nothing
++        }
++      }
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e05fac6d/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
----------------------------------------------------------------------
diff --cc eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
index 9edb82b,0000000..ac57855
mode 100644,000000..100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/HDFSAuditLogParser.java
@@@ -1,175 -1,0 +1,135 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *    http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.eagle.security.hdfs;
 +
++import eagle.security.util.LogParseUtil;
 +import org.apache.eagle.common.DateTimeUtil;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.Serializable;
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +
 +public final class HDFSAuditLogParser implements Serializable{
 +	private final static Logger LOG = LoggerFactory.getLogger(HDFSAuditLogParser.class);
 +
 +	private final static int LOGDATE_INDEX = 1;
 +	private final static int LOGLEVEL_INDEX = 2;
 +	private final static int LOGGER_INDEX = 3;
 +	private final static int LOGATTRS_INDEX = 4;
 +	private final static String LOGDATE="logdate";
 +	private final static String LOGLEVEL="loglevel";
 +	private final static String LOGHEADER="logheader";
 +	private final static String ALLOWED="allowed";
 +	private final static String UGI="ugi";
 +	private final static String IP="ip";
 +	private final static String CMD="cmd";
 +	private final static String SRC="src";
 +	private final static String DST="dst";
 +	private final static String PERM="perm";
 +	private final static Pattern loggerPattern = Pattern.compile("^([\\d\\s\\-:,]+)\\s+(\\w+)\\s+(.*):\\s+(.*)");
 +	private final static Pattern loggerAttributesPattern = Pattern.compile("(\\w+=[/.@\\-\\w\\\\$\\s\\\\(\\\\):]+)\\s+");
 +
 +	public HDFSAuditLogParser(){
 +	}
 +
 +	public HDFSAuditLogObject parse(String logLine) throws Exception{
 +		Map<String,String> auditMaps = parseAudit(logLine);
 +		if(auditMaps == null) return null;
 +        HDFSAuditLogObject entity = new HDFSAuditLogObject();
 +
 +		String ugi = auditMaps.get(UGI);
 +		if(ugi == null){
 +			LOG.warn("Ugi is null from audit log: " + logLine);
 +		}
 +
- 		String user = parseUserFromUGI(ugi);
++		String user = LogParseUtil.parseUserFromUGI(ugi);
 +
 +		if(user == null){
 +			LOG.warn("User is null from ugi" + ugi + ", audit log: " + logLine);
 +		}
 +
 +		String src = auditMaps.get(SRC);
 +		if(src != null && src.equals("null")){
 +			src = null;
 +		}
 +
 +		String dst = auditMaps.get(DST);
 +		if(dst != null && dst.equals("null")){
 +			dst = null;
 +		}
 +
 +
 +		String cmd = auditMaps.get(CMD);
 +		if(cmd == null){
 +			LOG.warn("Cmd is null from audit log: " + logLine);
 +		}
 +
 +        entity.user = user;
 +        entity.cmd = cmd;
 +        entity.src = src;
 +        entity.dst = dst;
 +        entity.host = auditMaps.get(IP);
 +        entity.allowed = Boolean.valueOf(auditMaps.get(ALLOWED));
 +        entity.timestamp = DateTimeUtil.humanDateToMilliseconds(auditMaps.get(LOGDATE));
 +
 +		return entity;
 +	}
 +
- 
- 	/**
- 	 * .e.g. hchen9@APD.xyz.com
- 	 */
- 	private final static Pattern UGI_PATTERN_DEFAULT = Pattern.compile("^([\\w\\d\\-]+)@.*");
- 	/**
- 	 * .e.g. hadoop/123.dc1.xyz.com@xyz.com (auth:KERBEROS)
- 	 */
- 	private final static Pattern UGI_PATTERN_SYSTEM = Pattern.compile("^([\\w\\d\\-]+)/[\\w\\d\\-.]+@.*");
- 
- 	/**
- 	 * .e.g. hadoop/123.dc1.xyz.com@xyz.com (auth:KERBEROS)
- 	 */
- 	private final static Pattern UGI_PATTERN_WITHBLANK = Pattern.compile("^([\\w\\d.\\-_]+)[\\s(]+.*");
- 
- 	/**
- 	 * @param ugi UGI field of audit log
- 	 * @return resultToMetrics user from UGI field of audit log
- 	 */
- 	public static  String parseUserFromUGI(String ugi) {
- 		if(ugi == null) return null;
- 		String newugi = ugi.trim();
- 
- 		Matcher defaultMatcher = UGI_PATTERN_DEFAULT.matcher(newugi);
- 		if(defaultMatcher.matches()){
- 			return defaultMatcher.group(1);
- 		}
- 
- 		Matcher sysMatcher = UGI_PATTERN_SYSTEM.matcher(newugi);
- 		if(sysMatcher.matches()){
- 			return sysMatcher.group(1);
- 		}
- 
- 		Matcher viaMatcher = UGI_PATTERN_WITHBLANK.matcher(newugi);
- 		if(viaMatcher.matches()){
- 			return viaMatcher.group(1);
- 		}
- 
- 		return newugi;
- 	}
- 
 +	private static Map<String,String> parseAttribute(String attrs){
 +		Matcher matcher = loggerAttributesPattern.matcher(attrs+" ");
 +		Map<String,String> attrMap=new HashMap<String, String>();
 +		while(matcher.find()){
 +			String kv = matcher.group();
 +			String[] kvs = kv.split("=");
 +			if(kvs.length>=2){
 +				attrMap.put(kvs[0].toLowerCase(),kvs[1].trim());
 +			}else{
 +				attrMap.put(kvs[0].toLowerCase(),null);
 +			}
 +		}
 +		return attrMap;
 +	}
 +
 +	private Map<String,String> parseAudit(String log){
 +		Matcher loggerMatcher = loggerPattern.matcher(log);
 +		Map<String,String> map = null;
 +		if(loggerMatcher.find()){
 +			try{
 +				map = new HashMap<String, String>();
 +				map.put(LOGDATE, loggerMatcher.group(LOGDATE_INDEX)); // logdate
 +				map.put(LOGLEVEL, loggerMatcher.group(LOGLEVEL_INDEX)); // level
 +				map.put(LOGHEADER, loggerMatcher.group(LOGGER_INDEX)); // logg
 +				Map<String,String> loggerAttributes = parseAttribute(loggerMatcher.group(LOGATTRS_INDEX));
 +				map.put(ALLOWED, loggerAttributes.get(ALLOWED));
 +				map.put(UGI, loggerAttributes.get(UGI));
 +				map.put(IP, loggerAttributes.get(IP));
 +				map.put(CMD, loggerAttributes.get(CMD));
 +				map.put(SRC, loggerAttributes.get(SRC));
 +				map.put(DST, loggerAttributes.get(DST));
 +				map.put(PERM, loggerAttributes.get(PERM));
 +			}catch (IndexOutOfBoundsException e){
 +				LOG.error("Got exception when parsing audit log:" + log + ", exception:" + e.getMessage(),
e);
 +				map = null;
 +			}
 +		}
 +		return map;
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e05fac6d/eagle-security/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e05fac6d/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------


Mime
View raw message