eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [03/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:07:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
deleted file mode 100644
index edfb15f..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-Manifest-Version: 1.0
-Class-Path: 
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
deleted file mode 100644
index 602e117..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.cep;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.*;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.*;
-
-public class TestSiddhiEvaluator {
-
-	int alertCount = 0;
-
-	public AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) {
-		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("application", "hdfsAuditLog");
-		tags.put("streamName", "hdfsAuditLogEventStream");
-		tags.put("attrName", attrName);
-		entity.setTags(tags);
-		entity.setAttrType(type);
-		return entity;
-	}
-
-	@Test
-	public void test() throws Exception{
-        Config config = ConfigFactory.load("unittest.conf");
-		AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) {
-			@Override
-			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String dataSource) throws Exception {
-				List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
-				list.add(createStreamMetaEntity("cmd", "string"));
-				list.add(createStreamMetaEntity("dst", "string"));
-				list.add(createStreamMetaEntity("src", "string"));
-				list.add(createStreamMetaEntity("host", "string"));
-				list.add(createStreamMetaEntity("user", "string"));
-				list.add(createStreamMetaEntity("timestamp", "long"));
-				list.add(createStreamMetaEntity("securityZone", "string"));
-				list.add(createStreamMetaEntity("sensitivityType", "string"));
-				list.add(createStreamMetaEntity("allowed", "string"));
-				return list;
-			}
-		};
-        StreamMetadataManager.getInstance().reset();
-        StreamMetadataManager.getInstance().init(config, streamDao);
-
-		Map<String, Object> data1 =  new TreeMap<String, Object>(){{
-			put("cmd", "open");
-			put("dst", "");
-			put("src", "");
-			put("host", "");
-			put("user", "");
-			put("timestamp", String.valueOf(System.currentTimeMillis()));
-			put("securityZone", "");
-			put("sensitivityType", "");
-			put("allowed", "true");
-		}};
-        final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
-        policyDef.setType("siddhiCEPEngine");
-        String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
-							"select * " +
-							"insert into outputStream ;";
-        policyDef.setExpression(expression);
-
-		PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, null),
-				Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
-			@Override
-			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
-				return null;
-			}
-
-            @Override
-            public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
-        };
-
-		AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) {
-			@Override
-			protected Map<String, String> getDimensions(String policyId) {
-				return new HashMap<String, String>();
-			}
-		};
-		alertExecutor.prepareConfig(config);
-		alertExecutor.init();
-
-		PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> context = new PolicyEvaluationContext<>();
-		context.alertExecutor = alertExecutor;
-		context.policyId = "testPolicy";
-		context.resultRender = new SiddhiAlertAPIEntityRender();
-		context.outputCollector = new Collector<Tuple2<String, AlertAPIEntity>> () {
-			@Override
-			public void collect(Tuple2<String, AlertAPIEntity> stringAlertAPIEntityTuple2) {
-				alertCount++;
-			}
-		};
-
-		SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator =
-				new SiddhiPolicyEvaluator<>(config, context, policyDef, new String[]{"hdfsAuditLogEventStream"}, false);
-
-		evaluator.evaluate(new ValuesArray(context.outputCollector, "hdfsAuditLogEventStream", data1));
-		Thread.sleep(2 * 1000);
-		Assert.assertEquals(alertCount, 1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
deleted file mode 100644
index f6d6a63..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-
-public class TestAlertDedup {
-
-	@Test
-	public void test() throws Exception{
-		String alertDef = "{\"alertDedupIntervalMin\":\"10\",\"fields\":[\"key1\",\"key2\",\"key3\"]}";
-		DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
-		Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 10);
-		Assert.assertEquals(dedupConfig.getFields().size(), 3);
-		
-		alertDef = "null";
-		dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
-		Assert.assertEquals(dedupConfig, null);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
deleted file mode 100644
index f7dcdde..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestAlertDefinitionDAOImpl {
-
-	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) {
-		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-		entity.setEnabled(true);
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("site", site);
-		tags.put("programId", programId);
-		tags.put("alertExecutorId", alertExecutorId);
-		tags.put("policyId", policyId);
-		tags.put("policyType", policyType);
-		entity.setTags(tags);
-		return entity;
-	}
-	
-	@Test
-	public void test() throws Exception{
-		Config config = ConfigFactory.load();
-		String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-		int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-		String site = "sandbox";
-		String dataSource = "UnitTest";
-		PolicyDefinitionDAO dao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort),
-				Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
-			@Override
-			public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception {
-				List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
-				return list;
-			}
-
-            @Override
-			public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
-		};
-
-		Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
-		
-		Assert.assertEquals(2, retMap.size());
-		Assert.assertEquals(2, retMap.get("TestExecutor1").size());
-		Assert.assertEquals(2, retMap.get("TestExecutor2").size());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
deleted file mode 100644
index a2b8a2d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-public class TestSiddhiStreamMetadataUtils {
-	@Test
-	public void test() throws Exception {
-        Config config = ConfigFactory.load();
-        StreamMetadataManager.getInstance().reset();
-        StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO() {
-            @Override
-            public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(
-                    String application) {
-                return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
-                        generateStreamMetadataAPIEntity("attrName2", "LONG")
-                );
-            }
-        });
-		String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
-		Assert.assertEquals("define stream " + "testStreamName" + "(attrName1 string,attrName2 long);", siddhiStreamDef);
-	}
-	
-	private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){
-		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-		entity.setTags(new HashMap<String, String>(){{
-			put("programId", "testProgramId");
-			put("streamName", "testStreamName");
-			put("attrName", attrName);
-		}});
-		entity.setAttrType(attrType);
-		return entity;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
deleted file mode 100644
index 0bbfc4a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestStreamDefinitionDAOImpl {
-	
-	public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) {
-		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-		entity.setAttrType("String");
-		entity.setAttrValueResolver("DefaultAttrValueResolver");
-		entity.setCategory("SimpleType");
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("programId", programId);
-		tags.put("streamName", streamName);
-		tags.put("attrName", attrName);
-		entity.setTags(tags);
-		return entity;
-	}
-	
-	@Test
-	public void test() throws Exception{
-        Config config = ConfigFactory.load();
-		AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) {
-			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String application) throws Exception {
-				List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
-				String programId = "UnitTest";
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1"));
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2"));
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3"));
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4"));
-				return list;
-			}
-		};
-        StreamMetadataManager.getInstance().reset();
-		StreamMetadataManager.getInstance().init(config, dao);
-		Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
-		Assert.assertTrue(retMap.get("TestStream").size() == 4);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
deleted file mode 100644
index 90353be..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import junit.framework.Assert;
-
-/**
- * @since Dec 18, 2015
- *
- */
-public class TestPolicyExecutor {
-
-	public static class T2 extends AbstractPolicyDefinitionEntity {
-		@Override
-		public String getPolicyDef() {
-			return null;
-		}
-		@Override
-		public boolean isEnabled() {
-			return false;
-		}
-	}
-
-	// not feasible to Unit test, it requires the local service.
-	@Ignore
-	@Test
-	public void testReflectCreatePolicyEvaluator() throws Exception {
-		System.setProperty("config.resource", "/unittest.conf");
-		String policyType = Constants.policyType.siddhiCEPEngine.name();
-		Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
-		Config config = ConfigFactory.load();
-
-		String def = "{\"expression\":\"from hdfsAuditLogEventStream select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}";
-		// test1 : test json deserialization
-		AbstractPolicyDefinition policyDef = null;
-		policyDef = JsonSerDeserUtils.deserialize(def, AbstractPolicyDefinition.class,
-				PolicyManager.getInstance().getPolicyModules(policyType));
-		// Assert conversion succeed
-		Assert.assertEquals(SiddhiPolicyDefinition.class, policyDef.getClass());
-
-		// make sure meta data manager initialized
-		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
-
-		String[] sourceStreams = new String[] { "hdfsAuditLogEventStream" };
-		// test2 : test evaluator
-		PolicyEvaluator pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class,
-				String[].class, boolean.class).newInstance(config, "policy-id", policyDef, sourceStreams, false);
-
-		PolicyEvaluator<AlertDefinitionAPIEntity> e1 = (PolicyEvaluator<AlertDefinitionAPIEntity>) pe;
-
-		PolicyEvaluator<T2> e2 = (PolicyEvaluator<T2>) pe;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
deleted file mode 100644
index 0504784..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import junit.framework.Assert;
-
-public class TestDynamicPolicyLoader {
-	private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
-
-	@Test
-	public void test() throws Exception{
-		System.setProperty("config.resource", "/unittest.conf");
-		Config config = ConfigFactory.load();
-		Map<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>>();
-		policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods<AlertDefinitionAPIEntity>() {
-			@Override
-			public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
-				LOG.info("deleted : " + deleted);
-			}
-			
-			@Override
-			public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
-				Assert.assertTrue(added.size() == 1);
-				LOG.info("added : " + added);
-			}
-			
-			@Override
-			public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
-				Assert.assertTrue(changed.size() == 1);
-				LOG.info("changed :" + changed);
-			}
-		});
-		
-		Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-		initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
-		Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
-		map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
-		map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
-		
-		PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao = new PolicyDefinitionDAO<AlertDefinitionAPIEntity>() {
-			@Override
-			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(
-					String site, String dataSource) {
-				Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-				currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
-				Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
-				map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
-				map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
-				return currentAlertDefs;
-			}
-			
-			@Override
-			public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) {
-				return null;
-			}
-
-            @Override
-            public void updatePolicyDetails(AlertDefinitionAPIEntity entity) { /* do nothing */ }
-		};
-		
-		DynamicPolicyLoader<AlertDefinitionAPIEntity> loader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
-		loader.init(initialAlertDefs, dao, config);
-		
-		try{
-			Thread.sleep(5000);
-		}catch(Exception ex){
-			
-		}
-	}
-	
-	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
-		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-		entity.setEnabled(true);
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("programId", programId);
-		tags.put("alertExecutorId", alertExecutorId);
-		tags.put("policyId", policyId);
-		tags.put("policyType", policyType);
-		entity.setTags(tags);
-		entity.setPolicyDef(policyDef);
-		return entity;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
deleted file mode 100644
index 77aaec3..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.junit.Test;
-
-public class TestPolicyDistribution {
-    @Test
-    public void test(){
-        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
deleted file mode 100644
index 1fb97fd..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestPolicyDistributionUpdater {
-    private static Logger LOG = LoggerFactory.getLogger(TestPolicyDistributionUpdater.class);
-
-    @Test
-    public void testPolicyDistributionReporter() throws Exception{
-        PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, 1),
-                Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
-            @Override
-            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
-                final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-                entity.setTags(new HashMap<String, String>() {{
-                    put(Constants.POLICY_TYPE, "siddhiCEPEngine");
-                    put(Constants.POLICY_ID, "policyId_1");
-                }});
-                Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-                map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{
-                    put("policyId_1", entity);
-                }});
-                entity.setPolicyDef("{\"type\":\"siddhiCEPEngine\",\"expression\":\"from testStream select name insert into outputStream ;\"}");
-                return map;
-            }
-
-            @Override
-            public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
-        };
-
-        AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId_1", new DefaultPolicyPartitioner(), 1, 0, alertDao, new String[]{"testStream"}){
-            public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
-                return new AlertStreamSchemaDAO(){
-                    @Override
-                    public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String application) throws Exception {
-                        AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-                        entity.setTags(new HashMap<String, String>(){{
-                            put("application", "UnitTest");
-                            put("streamName", "testStream");
-                            put("attrName", "name");
-                        }});
-                        entity.setAttrType("string");
-                        return Arrays.asList(entity);
-                    }
-                };
-            }
-
-            @Override
-            public void report() {
-                Assert.assertEquals(1, getPolicyEvaluators().size());
-                LOG.info("successuflly reported");
-            }
-        };
-
-        Config config = ConfigFactory.load();
-        alertExecutor.prepareConfig(config);
-        alertExecutor.init();
-        Thread.sleep(100);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
deleted file mode 100644
index c3bc4c9..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.junit.Test;
-
-public class TestPolicyPartitioner {
-    @Test
-    public void test(){
-        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
deleted file mode 100644
index 7903e0d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.text.MessageFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-/**
- * @since Dec 23, 2015
- *
- */
-public class TestExternalBatchWindow {
-
-    private static SiddhiManager siddhiManager;
-    
-    @BeforeClass
-    public static void beforeClass() {
-        siddhiManager = new SiddhiManager();
-    }
-    
-    @AfterClass
-    public static void afterClass() {
-        siddhiManager.shutdown();
-    }
-
-    @Test
-    public void test02NoMsg() throws Exception {
-        ExecutionPlanRuntime runtime = simpleQueryRuntime();
-
-        final AtomicBoolean recieved = new AtomicBoolean();
-        runtime.addCallback("query", new QueryCallback() {
-
-            @Override
-            public void receive(long arg0, Event[] arg1, Event[] arg2) {
-                recieved.set(true);
-                System.out.println(arg1);
-            }
-        });
-
-        InputHandler input = runtime.getInputHandler("jmxMetric");
-
-        runtime.start();
-        // external events' time stamp less than the window, should not have event recieved in call back.
-        long now = System.currentTimeMillis();
-        int length = 5;
-        for (int i = 0; i < length; i++) {
-            input.send(new Object[] { 15, now + i * 1000 });
-        }
-        
-        Thread.sleep(1000);
-        Assert.assertFalse("Event happens inner external time batch window, should not have event recieved in callback!", recieved.get());
-
-        runtime.shutdown();
-    }
-
-    private ExecutionPlanRuntime simpleQueryRuntime() {
-        String query = "define stream jmxMetric(cpu int, timestamp long); " 
-                + "@info(name='query')"
-                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) " 
-                + "select avg(cpu) as avgCpu, count(1) as count insert into tmp;";
-
-        return siddhiManager.createExecutionPlanRuntime(query);
-    }
-
-    /**
-     * This case try to capture the case that the window get a chunk of event that exceed the time batch.
-     * In this case, two next processor should be triggered.
-     */
-    @Test
-    public void test03BunchChunkExceedBatch() {
-        // TODO
-    }
-    @Test
-    public void test04MultiThread() {
-        // TODO
-    }
-
-    @Test
-    public void test05ExternalJoin() {
-        // TODO
-    }
-    
-    @Test
-    public void test06EdgeCase() throws Exception {
-        // every 10 sec
-        ExecutionPlanRuntime runtime = simpleQueryRuntime();
-
-        final AtomicInteger recCount = new AtomicInteger(0);
-//        final CountDownLatch latch = new CountDownLatch(2);// for debug
-        runtime.addCallback("query", new QueryCallback() {
-            @Override
-            public void receive(long arg0, Event[] arg1, Event[] arg2) {
-//                latch.countDown();
-                Assert.assertEquals(1, arg1.length);
-                recCount.incrementAndGet();
-                int avgCpu = ((Double) arg1[0].getData()[0]).intValue();
-                if (recCount.get() == 1) {
-                    Assert.assertEquals(15, avgCpu);
-                } else if (recCount.get() == 2) {
-                    Assert.assertEquals(85, avgCpu);
-                }
-                int count = ((Long) arg1[0].getData()[1]).intValue();
-                Assert.assertEquals(3, count);
-            }
-        });
-
-        InputHandler input = runtime.getInputHandler("jmxMetric");
-        runtime.start();
-        // external events' time stamp less than the window, should not have event recieved in call back.
-        long now = 0;
-        int length = 3;
-        for (int i = 0; i < length; i++) {
-            input.send(new Object[] { 15, now + i * 10 });
-        }
-        
-        // second round
-        // if the trigger event mix with the last window, we should see the avgValue is not expected
-        for (int i = 0; i < length; i++) {
-            input.send(new Object[] { 85, now + 10000 + i * 10 }); // the first entity of the second round
-        }
-        // to trigger second round
-        input.send(new Object[] { 10000, now + 10 * 10000 });
-        
-//        latch.await();// for debug
-
-        Thread.sleep(1000);
-        
-        Assert.assertEquals(2, recCount.get());
-    }
-
-    @Test
-    public void test07Pull76() throws Exception {
-        String defaultStream = "define stream LoginEvents (myTime long, ip string, phone string,price int);";
-
-        String query = " @info(name='pull76') "
-                + " from LoginEvents#window.eagle:externalTimeBatch(myTime, 5 sec)  "
-                + " select myTime, phone, ip, price, count(ip) as cntip , "
-                + " min(myTime) as mintime, max(myTime) as maxtime "
-                + " insert into events ;";
-        
-        ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(defaultStream + query);
-
-        InputHandler inputHandler = runtime.getInputHandler("LoginEvents");
-
-        runtime.addCallback("pull76", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                if (inEvents != null) {
-                    System.out.println("======================== START ===============================");
-                    int i = 0;
-                    System.out.println(" Events Size:" + inEvents.length);
-                    for (i = 0; i < inEvents.length; i++) {
-                        Event e = inEvents[i];
-                        System.out.println("----------------------------");
-                        System.out.println(new Date((Long) e.getData(0)));
-                        System.out.println("IP:" + e.getData(2));
-                        System.out.println("price :" + e.getData(3));
-                        System.out.println("count :" + e.getData(4));
-                        System.out.println("mintime :" + new Date((Long) e.getData(5)) );
-                        System.out.println("maxtime :" + new Date((Long) e.getData(6)) );
-                        System.out.println("----------------------------");
-                    }
-                    System.out.println("======================== END  ===============================");
-
-                }
-            }
-        });
-        
-        
-        runtime.start();
-        
-        long start = System.currentTimeMillis();
-        Calendar c = Calendar.getInstance();
-        c.add(Calendar.HOUR, 1);
-        c.add(Calendar.SECOND, 1);
-        int i = 0;
-        for (i = 0; i <= 10000; i++) {
-            c.add(Calendar.SECOND, 1);
-            inputHandler.send(c.getTime().getTime(),
-                    new Object[] { c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000) });
-        }
-        long end = System.currentTimeMillis();
-        System.out.printf("End : %d ", end - start);
-
-        Thread.sleep(1000);
-        runtime.shutdown();
-    }
-    
-    @Test
-    public void test01DownSampling() throws Exception {
-        String stream = "define stream jmxMetric(cpu int, memory int, bytesIn long, bytesOut long, timestamp long);";
-        String query = "@info(name = 'downSample') " 
-                + "from jmxMetric#window.eagle:externalTimeBatch(timestamp, 10 sec) "
-                + "select "
-                + "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as minCpu, "
-                + " '|' as s, "
-                + " avg(memory) as avgMem, max(memory) as maxMem, min(memory) as minMem, "
-                + " '|' as s1, "
-                + " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn, min(bytesIn) as minBytesIn, "
-                + " '|' as s2, "
-                + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, " 
-                + " '|' as s3, "
-                + " timestamp as timeWindowEnds, "
-                + " '|' as s4, "
-                + " count(1) as metric_count "
-                + " INSERT INTO tmp;";
-
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-
-        InputHandler input = plan.getInputHandler("jmxMetric");
-
-        // stream call back doesn't follow the counter
-        final AtomicInteger counter = new AtomicInteger();
-        {
-            // stream callback
-            plan.addCallback("jmxMetric", new StreamCallback() {
-                @Override
-                public void receive(Event[] arg0) {
-                    counter.addAndGet(arg0.length);
-                }
-            });
-        }
-        final AtomicInteger queryWideCounter = new AtomicInteger();
-        {
-            plan.addCallback("downSample", new QueryCallback() {
-                @Override
-                public void receive(long arg0, Event[] inevents, Event[] removeevents) {
-                    int currentCount = queryWideCounter.addAndGet(inevents.length);
-                    System.out.println(MessageFormat.format("Round {0} ====", currentCount));
-                    System.out.println(" events count " + inevents.length);
-
-                    for (Event e : inevents) {
-                        Object[] tranformedData = e.getData();
-                        for (Object o : tranformedData) {
-                            System.out.print(o);
-                            System.out.print(' ');
-                        }
-                        System.out.println(" events endendend");
-                    }
-                }
-
-            });
-        }
-
-        plan.start();
-
-        int round = 4;
-        int eventsPerRound= 0;
-        long externalTs = System.currentTimeMillis();
-        for (int i = 0; i < round; i++) {
-            eventsPerRound = sendEvent(input, i, externalTs);
-            Thread.sleep(3000);
-        }
-        //
-        sendEvent(input, round, externalTs);
-
-        plan.shutdown();
-        Thread.sleep(1000);
-        Assert.assertEquals(round * eventsPerRound + eventsPerRound, counter.get());
-        Assert.assertEquals(round, queryWideCounter.get());
-    }
-
-    // one round of sending events
-    private int sendEvent(InputHandler input, int ite, long externalTs) throws Exception {
-        int len = 3;
-        Event[] events = new Event[len];
-        for (int i = 0; i < len; i++) {
-            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
-            events[i] = new Event(externalTs,
-                    new Object[] { 15 + 10 * i * ite, 1500 + 10 * i * ite, 1000L, 2000L, externalTs + ite * 10000 + i * 50 });
-        }
-
-        input.send(events);
-        return len;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
deleted file mode 100644
index acead47..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-
-import org.apache.eagle.alert.executor.AlertExecutor;
-import junit.framework.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-public class TestSiddhiEngine {
-    static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
-    int alertCount = 0;
-
-    @Test
-    public void TestStrContains() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestRegexp() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (str string, other string, num double) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select str as str1, other as other1 , num as num1, count(num) as number " +
-                "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestStrEqualsIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        Field field = QueryCallback.class.getDeclaredField("query");
-        field.setAccessible(true);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestStrContainsIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        Field field = QueryCallback.class.getDeclaredField("query");
-        field.setAccessible(true);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestRegexpIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select str as str1, other as other1 , num as num1, count(num) as number " +
-                "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-    
-    @Test
-    public void TestDataObject() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-        
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (dataobj object, str string, first string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select * " +
-                "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-           public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
deleted file mode 100644
index 521317c..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import org.apache.eagle.common.DateTimeUtil;
-
-public class TestSiddhiSlideWindow {
-
-    int alertCount = 0;
-
-    @Test
-    public void testSlideWindow1() throws Exception{
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
-//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
-//        		     + " select user, path, cmd, count(path) as cnt" 
-//        			 + " group by user"
-//        			 + " having cnt > 3 insert all events into outputStream;";
-
-//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
-//        		+ " select user, path, cmd, count(path) as cnt" 
-//        		+ " group by user"
-//        		+ " having cnt > 3 insert all events into outputStream;";
-
-//      String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
-//				+ " select user, path, cmd, count(path) as cnt" 
-//				+ " group by user"
-//				+ " having cnt > 3 insert all events into outputStream;";
-
-        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
-                + " select user, path, cmd, count(path) as cnt"
-                + " group by user"
-                + " having cnt > 3 insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
-
-        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
-
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 0);
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testSlideWindow2() throws Exception{
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
-        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
-                + " select user, path, cmd, count(path) as cnt"
-                + " group by user"
-                + " having cnt > 3 insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
-        Thread.sleep(1100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 2);
-        executionPlanRuntime.shutdown();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
deleted file mode 100644
index 0027bce..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSiddhiStream {
-	
-	@Test
-	public void test() {
-		String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,define stream hdfsAuditLogEventStream(eagleAlertContext object, allowed string,cmd string,dst string,host string,securityZone string,sensitivityType string,src string,timestamp long,user string); @info(name = 'query') from hdfsAuditLogEventStream[cmd=='open'] select * insert into outputStream ; insert into outputStream;";
-		Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-		
-		rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select    * insert into outputStream;";
-		Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
deleted file mode 100644
index adcd728..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-
-import junit.framework.Assert;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-public class TestAggregation {
-    @Test
-    public void test01DownSampling() throws Exception {
-        String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
-        String query = "@info(name = 'downSample') "
-                + "from jmxMetric#window.timeBatch(1 sec) "
-                + "select "
-                + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
-                + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
-                + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
-                + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
-                + " timestamp as timeWindowEnds "
-                + " INSERT  INTO tmp;";
-
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-
-        final AtomicInteger counter = new AtomicInteger();
-        plan.addCallback("downSample", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                int count = counter.incrementAndGet();
-                if (count == 1) {
-                    Assert.assertEquals(6000L, inEvents[0].getData(9));
-                } else if(count == 2) {
-                    Assert.assertEquals(6000L, inEvents[0].getData(9));
-                }
-            }
-        });
-        InputHandler input = plan.getInputHandler("jmxMetric");
-
-        plan.start();
-        sendEvent(input);
-        Thread.sleep(100);
-        sendEvent(input);
-        Thread.sleep(1000);
-        sendEvent(input);
-        Thread.sleep(1000);
-        sendEvent(input);
-        Thread.sleep(200);
-        plan.shutdown();
-    }
-
-    // send 3 events
-    private void sendEvent(InputHandler input) throws Exception {
-        int len = 3;
-        Event[] events = new Event[len];
-        for (int i = 0; i < len; i++) {
-            long externalTs = System.currentTimeMillis();
-            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
-            events[i] = new Event(externalTs + i, new Object[] {
-                    15.0,
-                    15,
-                    1000,
-                    2000L,
-                    externalTs + i
-            });
-        }
-
-        for (Event e : events) {
-            input.send(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
deleted file mode 100644
index e0be82c..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-/**
- * Created by yonzhang on 11/25/15.
- */
-public class TestSiddhiExpiredEvents {
-    @Test
-    public void testExpiredEventsInLengthWindow() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream TempStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from TempStream#window.length(3) "
-                + " select *"
-                + " insert all events into DelayedTempStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "open1"});
-        inputHandler.send(new Object[]{"user", "open2"});
-        inputHandler.send(new Object[]{"user", "open3"});
-        inputHandler.send(new Object[]{"user", "open4"});
-        inputHandler.send(new Object[]{"user", "open5"});
-        inputHandler.send(new Object[]{"user", "open6"});
-        Thread.sleep(1000);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testExpiredEventsInLengthBatchWindow() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream TempStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from TempStream#window.lengthBatch(2) "
-                + " select *"
-                + " insert all events into DelayedTempStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "open1"});
-        inputHandler.send(new Object[]{"user", "open2"});
-        inputHandler.send(new Object[]{"user", "open3"});
-        inputHandler.send(new Object[]{"user", "open4"});
-        inputHandler.send(new Object[]{"user", "open5"});
-        inputHandler.send(new Object[]{"user", "open6"});
-        Thread.sleep(1000);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testExpireEvents2() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream TempStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from TempStream#window.length(4) "
-                + " select user, cmd, count(user) as cnt " +
-                " group by user " +
-                "having cnt > 2 "
-                + " insert all events into DelayedTempStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "open1"});
-        inputHandler.send(new Object[]{"user", "open2"});
-        inputHandler.send(new Object[]{"user", "open3"});
-        inputHandler.send(new Object[]{"user", "open4"});
-        inputHandler.send(new Object[]{"user", "open5"});
-//        inputHandler.send(new Object[]{"user", "open6"});
-//        inputHandler.send(new Object[]{"user", "open7"});
-//        inputHandler.send(new Object[]{"user", "open8"});
-//        inputHandler.send(new Object[]{"user", "open9"});
-        Thread.sleep(1000);
-        executionPlanRuntime.shutdown();
-    }
-}


Mime
View raw message