eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yonzhang2...@apache.org
Subject [1/2] incubator-eagle git commit: EAGLE-14 Re-assemble high level hdfs commands for better policy \ https://issues.apache.org/jira/browse/EAGLE-14\ Author: yonzhang Reviewer: Ralph Closes #25 from yonzhang
Date Sat, 12 Dec 2015 19:56:55 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 5334a23d0 -> e362b6f33


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
new file mode 100644
index 0000000..c6894dc
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -0,0 +1,166 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.security.auditlog;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.siddhi.AttributeType;
+import org.apache.eagle.alert.siddhi.SiddhiStreamMetadataUtils;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+
+import java.util.*;
+
+/**
+ * Created by yonzhang on 11/20/15.
+ */
+public class HdfsUserCommandReassembler extends JavaStormStreamExecutor2<String, Map>
{
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsUserCommandReassembler.class);
+    private Config config;
+    private InputHandler inputHandler;
+    /**
+     * event schema is attribute name/type pairs
+     */
+    private final String streamName = "eventStream";
+    public final static SortedMap<String, String> eventSchema = new TreeMap<String,
String>(){{
+        put("timestamp", AttributeType.LONG.name());
+        put("src", AttributeType.STRING.name());
+        put("dst", AttributeType.STRING.name());
+        put("host", AttributeType.STRING.name());
+        put("allowed", AttributeType.STRING.name());
+        put("user", AttributeType.STRING.name());
+        put("cmd", AttributeType.STRING.name());
+    }};
+
+    @Override
+    public void prepareConfig(Config config) {
+        this.config = config;
+    }
+
+    private static class GenericQueryCallback extends QueryCallback{
+        private Map<String, String> outputSelector;
+        private Map<String, String> outputModifier;
+        public GenericQueryCallback(Map<String, String> outputSelector, Map<String,
String> outputModifier){
+            this.outputSelector = outputSelector;
+            this.outputModifier = outputModifier;
+        }
+        @Override
+        public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+            Object[] attrValues = inEvents[0].getData();
+            Collector<Tuple2<String, Map>> collector = (Collector<Tuple2<String,
Map>>)attrValues[0];
+            SortedMap<String, Object> outputEvent = new TreeMap<String, Object>();
+            int i = 1;  // output is from second element
+            String user = null;
+            for(String attrKey : outputSelector.keySet()){
+                Object v = attrValues[i++];
+                outputEvent.put(attrKey, v);
+                if(attrKey.equals("user"))
+                    user = (String)v;
+            }
+
+            outputEvent.putAll(outputModifier);
+            LOG.debug("outputEvent: " + outputEvent);
+            collector.collect(new Tuple2<String, Map>(user, outputEvent));
+        }
+    }
+
+    @Override
+    public void init() {
+        String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(streamName, eventSchema);
+        SiddhiManager siddhiManager = new SiddhiManager();
+        StringBuilder sb = new StringBuilder();
+        sb.append(streamDef);
+        String readFrom = null;
+        try {
+            readFrom = config.getString("eagleProps.readHdfsUserCommandPatternFrom");
+        }catch(Exception ex){
+            LOG.warn("no config for readHdfsUserCommandPatternFrom", ex);
+            readFrom = "file";
+        }
+        List<HdfsUserCommandPatternEntity> list = null;
+        try {
+            if (readFrom.equals("file")) {
+                list = new HdfsUserCommandPatternByFileImpl().findAllPatterns();
+            } else {
+                list = new HdfsUserCommandPatternByDBImpl(new EagleServiceConnector(config)).findAllPatterns();
+            }
+        }catch(Exception ex){
+            LOG.error("fail reading hfdsUserCommandPattern", ex);
+            throw new IllegalStateException(ex);
+        }
+        for(HdfsUserCommandPatternEntity rule : list){
+            sb.append(String.format("@info(name = '%s') from ", rule.getTags().get("userCommand")));
+            sb.append(rule.getPattern());
+            sb.append(" select a.context, ");
+            for(Map.Entry<String, String> entry : rule.getFieldSelector().entrySet()){
+                sb.append(entry.getValue());
+                sb.append(" as ");
+                sb.append(entry.getKey());
+                sb.append(", ");
+            }
+            sb.deleteCharAt(sb.lastIndexOf(","));
+            sb.append("insert into ");
+            sb.append(rule.getTags().get("userCommand"));
+            sb.append("_outputStream;");
+        }
+
+        LOG.info("patterns: " + sb.toString());
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(sb.toString());
+
+        for(HdfsUserCommandPatternEntity rule : list){
+            executionPlanRuntime.addCallback(rule.getTags().get("userCommand"), new GenericQueryCallback(rule.getFieldSelector(),
rule.getFieldModifier()));
+        }
+
+        inputHandler = executionPlanRuntime.getInputHandler(streamName);
+        executionPlanRuntime.start();
+    }
+
+    @Override
+    public void flatMap(List<Object> input, Collector<Tuple2<String, Map>>
collector) {
+        LOG.debug("incoming event:" + input.get(1));
+        SortedMap<String, Object> toBeCopied = (SortedMap<String, Object>)input.get(1);
+        SortedMap<String, Object> event = new TreeMap<String, Object>(toBeCopied);
+        Object[] siddhiEvent = convertToSiddhiEvent(collector, event);
+        try {
+            inputHandler.send(siddhiEvent);
+        }catch(Exception ex){
+            LOG.error("fail sending event to Siddhi pattern engine", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    public Object[] convertToSiddhiEvent(Object context, SortedMap<String, Object>
event){
+        Object[] siddhiEvent = new Object[1+event.size()];
+        siddhiEvent[0] = context; // context
+        int i = 1;
+        for(Object value : event.values()){
+            siddhiEvent[i++] = value;
+        }
+        return siddhiEvent;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
index 4f8023e..5fbcfd8 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
@@ -24,10 +24,12 @@ import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
 import org.apache.eagle.security.hdfs.entity.IPZoneEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.apache.storm.guava.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.TreeMap;
 
 public class IPZoneDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> {
 	private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinExecutor.class);
@@ -52,7 +54,8 @@ public class IPZoneDataJoinExecutor extends JavaStormStreamExecutor2<String,
Map
 
     @Override
     public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
Map>> outputCollector){
-        Map<String, Object> event = (Map<String, Object>)input.get(1);
+        Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1);
+        Map<String, Object> event = new TreeMap<String, Object>(toBeCopied);
// shallow copy
         Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class);
         IPZoneEntity e = null;
         if(map != null){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 43e5b58..1d13082 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -55,10 +55,11 @@
     #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
-      "port": 38080,
+      "port": 9099,
       "username": "admin",
       "password": "secret"
-    }
+    },
+    "readHdfsUserCommandPatternFrom" : "file"
   },
   "dynamicConfigSource" : {
   	"enabled" : true,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsIPHosts.txt
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsIPHosts.txt
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsIPHosts.txt
deleted file mode 100644
index fdf5d1a..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsIPHosts.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-10.115.1.17
-10.115.1.23
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsUserCommandPattern.json
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsUserCommandPattern.json
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsUserCommandPattern.json
new file mode 100644
index 0000000..59200ac
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/hdfsUserCommandPattern.json
@@ -0,0 +1,54 @@
+[
+  {
+    "tags" : {
+      "userCommand" : "appendToFile"
+    },
+    "pattern" : "every a = eventStream[cmd=='append']",
+    "fieldSelector" : {
+      "timestamp" : "a.timestamp",
+      "src" : "a.src",
+      "dst" : "a.dst",
+      "host" : "a.host",
+      "allowed" : "a.allowed",
+      "user" : "a.user"
+    },
+    "fieldModifier" : {
+      "cmd" : "user:appendToFile"
+    }
+  },
+  {
+    "tags" : {
+      "userCommand" : "read"
+    },
+    "description" : "for all read operations, i.e. cat text tail get copyToLocal getmerge",
+    "pattern": "every a = eventStream[cmd=='open'] ",
+    "fieldSelector": {
+      "timestamp" : "a.timestamp",
+      "src" : "a.src",
+      "dst" : "a.dst",
+      "host" : "a.host",
+      "allowed" : "a.allowed",
+      "user" : "a.user"
+    },
+    "fieldModifier": {
+      "cmd": "user:read"
+    }
+  },
+  {
+    "tags" : {
+      "userCommand" : "copyFromLocal"
+    },
+    "description" : "for force copyFromLocal operation, i.e. copyFromLocal -f",
+    "pattern" : "every (a = eventStream[cmd=='getfileinfo' and str:contains(src,'._COPYING_')]
-> b = eventStream[cmd=='create' and user==a.user and src==a.src and host==a.host] ->
c = eventStream[cmd=='getfileinfo' and user==a.user and src==a.src and host==a.host] ->
d = eventStream[cmd=='delete' and user==a.user and a.src==str:concat(src,'._COPYING_') and
host==a.host] -> e = eventStream[cmd=='rename' and user==a.user and src==a.src and dst==d.src
and host==a.host]) ",
+    "fieldSelector": {
+      "timestamp" : "a.timestamp",
+      "dst": "d.src",
+      "host": "a.host",
+      "allowed": "a.allowed",
+      "user": "a.user"
+    },
+    "fieldModifier": {
+      "cmd": "user:copyFromLocal"
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
index 8a0919a..07f8402 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
@@ -19,9 +19,10 @@ eagle.log.dir=./logs
 eagle.log.file=eagle.log
 
 
-#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-#log4j.logger.eagle.executor.AlertExecutor=DEBUG
+#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
+#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
+log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
+#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
new file mode 100644
index 0000000..0c0423b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * test pattern download and parse
+ */
+public class TestHdfsUserCommandPatternByDB {
+    @Test
+    // not qualified for unit test as it connects to local service
+    public void testPatternDownload() throws Exception{
+        EagleServiceConnector connector = new EagleServiceConnector("localhost", 9099, "admin",
"secret");
+        HdfsUserCommandPatternByDBImpl impl = new HdfsUserCommandPatternByDBImpl(connector);
+        List<HdfsUserCommandPatternEntity> list = impl.findAllPatterns();
+        for(HdfsUserCommandPatternEntity entity : list){
+            System.out.println(entity.getPattern());
+            System.out.println(entity.getFieldSelector());
+            System.out.println(entity.getFieldModifier());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
new file mode 100644
index 0000000..2886372
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * test pattern download and parse
+ */
+public class TestHdfsUserCommandPatternByFile {
+    @Test
+    // not qualified for unit test as it connects to local service
+    public void testPatternDownload() throws Exception{
+        HdfsUserCommandPatternByFileImpl impl = new HdfsUserCommandPatternByFileImpl();
+        List<HdfsUserCommandPatternEntity> list = impl.findAllPatterns();
+        for(HdfsUserCommandPatternEntity entity : list){
+            System.out.println(entity.getPattern());
+            System.out.println(entity.getFieldSelector());
+            System.out.println(entity.getFieldModifier());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
new file mode 100644
index 0000000..55f509b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestSiddhiPattern.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.security.auditlog;
+
+import org.apache.eagle.common.DateTimeUtil;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+import java.lang.reflect.Field;
+
+public class TestSiddhiPattern {
+    @Test
+    public void testPattern() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream eventStream (timeStamp long, user string,
src string, cmd string);";
+        String query = "@info(name = 'query1') from " +
+                "every a = eventStream[cmd=='getfileinfo'] " +
+                "-> b = eventStream[cmd=='append' and user==a.user and src==a.src] " +
+                "-> c = eventStream[cmd=='getfileinfo'and user==a.user and src==a.src]
" +
+                "select a.user as user, b.cmd as cmd, a.src as src " +
+                "insert into outputStreams";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream
+ query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
+        executionPlanRuntime.start();
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        System.out.println("curTime : " + curTime);
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "getfileinfo"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "append"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "getfileinfo"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "open"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "append"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "getfileinfo"});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "getfileinfo"});
+        Thread.sleep(100);
+        executionPlanRuntime.shutdown();
+
+    }
+
+    @Test
+    public void testMultiplePatterns() throws Exception{
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream eventStream (timeStamp long, user string,
src string, cmd string);";
+        String query1 = "@info(name = 'query1') from " +
+                "every a = eventStream[cmd=='getfileinfo'] " +
+                "-> b = eventStream[cmd=='append' and user==a.user and src==a.src] " +
+                "-> c = eventStream[cmd=='getfileinfo'and user==a.user and src==a.src]
" +
+                "select a.user as user, b.cmd as cmd, a.src as src " +
+                "insert into outputStream";
+        String query2 = ";@info(name = 'query2') from " +
+                "every a = eventStream[cmd=='getfileinfo'] " +
+                "-> b = eventStream[cmd=='open' and user==a.user and src==a.src] " +
+                "select a.user as user, b.cmd as cmd, a.src as src " +
+                "insert into outputStream";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream
+ query1+query2);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+        executionPlanRuntime.addCallback("query2", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
+        executionPlanRuntime.start();
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        System.out.println("curTime : " + curTime);
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "getfileinfo"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "append"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "getfileinfo"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "open"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "append"});
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private", "getfileinfo"});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{curTime, "user", "/tmp/private1", "getfileinfo"});
+        Thread.sleep(100);
+        executionPlanRuntime.shutdown();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
new file mode 100644
index 0000000..27a2e7d
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
@@ -0,0 +1,138 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.Tuple2;
+import org.junit.Test;
+
+import java.util.*;
+
+/**
+ * Created by yonzhang on 11/24/15.
+ */
+public class TestUserCommandReassembler {
+    private Map<String, Object> parseEvent(String log){
+        HdfsAuditLogKafkaDeserializer deserializer = new HdfsAuditLogKafkaDeserializer(null);
+        return (Map<String, Object>)deserializer.deserialize(log.getBytes());
+    }
+
+    /**
+     * 2015-11-19 23:57:02,934 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc
+     * 2015-11-19 23:57:03,046 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=append src=/tmp/private dst=null perm=null proto=rpc
+     * 2015-11-19 23:57:03,118 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc
+     */
+    @Test
+    public void testAppend() throws Exception{
+        String e1 = "2015-11-19 23:57:02,934 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc";
+        String e2 = "2015-11-19 23:57:03,046 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=append src=/tmp/private dst=null perm=null proto=rpc";
+        String e3 = "2015-11-19 23:57:03,118 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc";
+        HdfsUserCommandReassembler assembler = new HdfsUserCommandReassembler();Config config
= ConfigFactory.load();
+        assembler.prepareConfig(config);
+        assembler.init();
+
+        Collector<Tuple2<String, Map>> collector = new Collector<Tuple2<String,
Map>>(){
+            @Override
+            public void collect(Tuple2<String, Map> stringMapTuple2) {
+                String cmd = (String)stringMapTuple2.f1().get("cmd");Assert.assertEquals("user:appendToFile",
cmd);
+                System.out.println("assert passed!!!");
+            }
+        };
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e1)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e2)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e3)), collector);
+        // sleep for a while for Siddhi engine callback to be invoked
+        Thread.sleep(100);
+    }
+
+    /**
+     * 2015-11-19 23:47:28,922 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc
+     * 2015-11-19 23:47:29,026 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=open src=/tmp/private dst=null perm=null proto=rpc
+     */
+    @Test
+    public void testRead() throws Exception{
+        String e1 = "2015-11-19 23:47:28,922 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc";
+        String e2 = "2015-11-19 23:47:29,026 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=open src=/tmp/private dst=null perm=null proto=rpc";
+        String e3 = "2015-11-19 23:47:28,925 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc";
+        String e4 = "2015-11-19 23:47:29,028 INFO FSNamesystem.audit: allowed=true ugi=root
(auth:SIMPLE) ip=/10.0.2.15 cmd=open src=/tmp/private dst=null perm=null proto=rpc";
+        HdfsUserCommandReassembler assembler = new HdfsUserCommandReassembler();
+        Config config = ConfigFactory.load();
+        assembler.prepareConfig(config);
+        assembler.init();
+
+        Collector<Tuple2<String, Map>> collector = new Collector<Tuple2<String,
Map>>(){
+            @Override
+            public void collect(Tuple2<String, Map> stringMapTuple2) {
+                String cmd = (String)stringMapTuple2.f1().get("cmd");
+                Assert.assertEquals("user:read", cmd);
+                System.out.println("assert passed!!!");
+            }
+        };
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e1)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e3)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e2)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e4)), collector);
+        // sleep for a while for Siddhi engine callback to be invoked
+        Thread.sleep(100);
+    }
+
+    /**
+     * 2015-11-20 00:06:47,090 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private dst=null perm=null proto=rpc
+     * 2015-11-20 00:06:47,185 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private._COPYING_ dst=null perm=null proto=rpc
+     * 2015-11-20 00:06:47,254 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=create src=/tmp/private._COPYING_ dst=null perm=root:hdfs:rw-r--r-- proto=rpc
+     * 2015-11-20 00:06:47,289 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=getfileinfo src=/tmp/private._COPYING_ dst=null perm=null proto=rpc
+     * 2015-11-20 00:06:47,609 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=delete src=/tmp/private dst=null perm=null proto=rpc
+     * 2015-11-20 00:06:47,624 INFO FSNamesystem.audit: allowed=true ugi=root (auth:SIMPLE)
ip=/10.0.2.15 cmd=rename src=/tmp/private._COPYING_ dst=/tmp/private perm=root:hdfs:rw-r--r--
proto=rpc
+     */
+    @Test
+    public void testCopyFromLocal() throws Exception{
+        String e1 = "2015-12-04 22:03:11,609 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/10.0.2.15   cmd=getfileinfo src=/tmp/private      dst=null perm=null  
    proto=rpc";
+        String e2 = "2015-12-04 22:03:11,730 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/10.0.2.15   cmd=getfileinfo src=/tmp/private._COPYING_     dst=null   
    perm=null     proto=rpc";
+        String e3 = "2015-12-04 22:03:11,798 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/10.0.2.15   cmd=create      src=/tmp/private._COPYING_     dst=null   
    perm=hdfs:hdfs:rw-r--r--        proto=rpc";
+        String e4 = "2015-12-04 22:03:11,827 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/10.0.2.15   cmd=getfileinfo src=/tmp/private._COPYING_     dst=null   
    perm=null       proto=rpc";
+        String e5 = "2015-12-04 22:03:11,935 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/10.0.2.15   cmd=delete      src=/tmp/private      dst=null perm=null  
    proto=rpc";
+        String e6 = "2015-12-04 22:03:11,945 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/10.0.2.15   cmd=rename      src=/tmp/private._COPYING_     dst=/tmp/private
       perm=hdfs:hdfs:rw-r--r--        proto=rpc";
+        HdfsUserCommandReassembler assembler = new HdfsUserCommandReassembler();
+        Config config = ConfigFactory.load();
+        assembler.prepareConfig(config);
+        assembler.init();
+
+        Collector<Tuple2<String, Map>> collector = new Collector<Tuple2<String,
Map>>(){
+            @Override
+            public void collect(Tuple2<String, Map> stringMapTuple2) {
+                String cmd = (String)stringMapTuple2.f1().get("cmd");
+                Assert.assertEquals("user:copyFromLocal", cmd);
+                Assert.assertEquals("user:appendToFile", cmd);
+                System.out.println("assert passed!!!");
+            }
+        };
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e1)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e2)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e3)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e4)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e5)), collector);
+        assembler.flatMap(Arrays.asList("user1", parseEvent(e6)), collector);
+        // sleep for a while for Siddhi engine callback to be invoked
+        Thread.sleep(100);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileMLAlgorithmEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileMLAlgorithmEvaluator.java
b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileMLAlgorithmEvaluator.java
index 2e8258b..58e28a4 100644
--- a/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileMLAlgorithmEvaluator.java
+++ b/eagle-security/eagle-security-userprofile/detection/src/main/java/org/apache/eagle/security/userprofile/UserProfileMLAlgorithmEvaluator.java
@@ -29,6 +29,7 @@ import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
 import org.apache.eagle.security.userprofile.model.UserProfileModel;
 import com.typesafe.config.Config;
 import org.apache.eagle.security.userprofile.model.UserActivityAggModelEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +58,7 @@ public abstract class UserProfileMLAlgorithmEvaluator<M extends UserProfileModel
     protected MLModelDAO getModelDAO(){
         // TODO: 1. Adapt to different model protocol including service, hdfs, file or kafka
         // TODO: 2. Model cache configuration
-        return new MLModelDAOImpl(this.config);
+        return new MLModelDAOImpl(new EagleServiceConnector(this.config));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e362b6f3/test.txt
----------------------------------------------------------------------
diff --git a/test.txt b/test.txt
new file mode 100644
index 0000000..6623e06
--- /dev/null
+++ b/test.txt
@@ -0,0 +1 @@
+test for keep file for user command reassembler



Mime
View raw message