eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinh...@apache.org
Subject incubator-eagle git commit: [EAGLE-776] add unit test for eagle-alert-parent
Date Wed, 23 Nov 2016 06:30:02 GMT
Repository: incubator-eagle
Updated Branches:
  refs/heads/master 7499be694 -> 8d7f81e1c


[EAGLE-776] add unit test for eagle-alert-parent

add unit test for eagle-alert-parent

https://issues.apache.org/jira/browse/EAGLE-776

Author: koone <luokun1028@126.com>

Closes #673 from koone/EAGLE-777.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8d7f81e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8d7f81e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8d7f81e1

Branch: refs/heads/master
Commit: 8d7f81e1cdcc61e16769677dda2c371897f25dca
Parents: 7499be6
Author: koone <luokun1028@126.com>
Authored: Wed Nov 23 14:29:48 2016 +0800
Committer: wujinhu <wujinhu920@126.com>
Committed: Wed Nov 23 14:29:48 2016 +0800

----------------------------------------------------------------------
 .../alert-metadata-service/pom.xml              |  13 +-
 .../impl/TopologyMgmtResourceImplTest.java      |  72 ++++
 .../src/test/resources/application.conf         |  10 +-
 .../alert-metadata/pom.xml                      |  12 +
 .../eagle/alert/metadata/MetadataUtils.java     |   1 +
 .../metadata/impl/InMemMetadataDaoImpl.java     |   6 +-
 .../eagle/alert/metadata/TestMetadataUtils.java |  59 ++++
 .../eagle/alert/metadata/impl/InMemoryTest.java | 117 +++++++
 .../eagle/alert/metadata/impl/JdbcImplTest.java | 164 +++++++++
 .../alert/metadata/impl/MongoImplTest.java      | 344 ++++++++++++++++++
 .../alert/resource/impl/InMemoryTest.java       |  48 ---
 .../alert/resource/impl/JdbcImplTest.java       | 165 ---------
 .../alert/resource/impl/MongoImplTest.java      | 345 -------------------
 13 files changed, 791 insertions(+), 565 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
index 9d5e8f1..cf1f0fc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -84,7 +84,18 @@
             <groupId>io.swagger</groupId>
             <artifactId>swagger-jaxrs</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
index e46213e..b9a7634 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
@@ -18,13 +18,40 @@
 
 package org.apache.eagle.service.topology.resource.impl;
 
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.utils.NimbusClient;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
+import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.powermock.api.mockito.PowerMockito.when;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({TopologyMgmtResourceImpl.class, StormSubmitter.class})
 public class TopologyMgmtResourceImplTest {
     TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl();
     String topologyName = "testStartTopology";
@@ -52,4 +79,49 @@ public class TopologyMgmtResourceImplTest {
         List<TopologyStatus> topologies = topologyManager.getTopologies();
         Assert.assertTrue(topologies.size() == 1);
     }
+
+    @Test
+    public void testGetTopologies1() throws Exception {
+        IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+        TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl();
+        Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao");
+        daoField.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL);
+        daoField.set(null, dao);
+        // set data
+        Topology topology = new Topology("test", 1, 1);
+        StreamingCluster cluster =new StreamingCluster();
+        dao.clear();
+        dao.addTopology(topology);
+        dao.addCluster(cluster);
+        TopologyMgmtResourceImpl spy = PowerMockito.spy(service);
+        PowerMockito.doReturn(new TopologySummary()).when(spy,"getTopologySummery", Mockito.anyCollection(), Mockito.any(Topology.class));
+        Assert.assertEquals(1, spy.getTopologies().size());
+    }
+
+    @Test
+    public void testStartTopology1() throws Exception {
+        IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+        TopologyMgmtResourceImpl service = new TopologyMgmtResourceImpl();
+        Field daoField = TopologyMgmtResourceImpl.class.getDeclaredField("dao");
+        daoField.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(daoField, daoField.getModifiers() & ~Modifier.FINAL);
+        daoField.set(null, dao);
+        // set data
+        Topology topology = new Topology("test", 1, 1);
+        StreamingCluster cluster =new StreamingCluster();
+        dao.clear();
+        dao.addTopology(topology);
+        dao.addCluster(cluster);
+        PowerMockito.mockStatic(StormSubmitter.class);
+        PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology",Mockito.eq("test"), Mockito.anyMap(), Mockito.any(StormTopology.class));
+        TopologyMgmtResourceImpl spy = PowerMockito.spy(service);
+        PowerMockito.doReturn(null).when(spy,"createTopology", Mockito.any(Topology.class));
+        spy.startTopology("test");
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
index f760241..1b6a281 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
@@ -13,8 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-{
-  "datastore": {
-    "metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl"
+metadata {
+  metadataDao = org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl
+  jdbc {
+    url = "localhost:27017"
+  }
+  properties {
+
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
index 07f373a..ebe24e2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
@@ -64,6 +64,18 @@
             <artifactId>guice</artifactId>
             <version>3.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
index 3e03b57..be22280 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
@@ -75,6 +75,7 @@ public class MetadataUtils {
     }
 
     public static Connection getJdbcConnection(Config config) {
+
         Connection connection = null;
         try {
             if (config.hasPath(JDBC_USERNAME_PATH)) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index b608516..611bbb4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.eagle.alert.metadata.impl;
 
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
@@ -26,8 +28,6 @@ import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -296,7 +296,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
-    public OpResult clear() {
+    public synchronized OpResult clear() {
         LOG.info("clear models...");
         this.assignments.clear();
         this.clusters.clear();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java
new file mode 100644
index 0000000..1191dcb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/TestMetadataUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Created by luokun on 2016/11/16.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(MetadataUtils.class)
+public class TestMetadataUtils {
+
+    @Rule
+    public ExpectedException thrown= ExpectedException.none();
+
+    @Test
+    public void testGetKey() throws Exception {
+        StreamDefinition stream = new StreamDefinition();
+        Assert.assertNull(MetadataUtils.getKey(stream));
+        PolicyAssignment policyAssignment = new PolicyAssignment();
+        policyAssignment.setPolicyName("test");
+        Assert.assertEquals("test", MetadataUtils.getKey(policyAssignment));
+        ScheduleState scheduleState = new ScheduleState();
+        scheduleState.setVersion("1.0");
+        Assert.assertEquals("1.0", MetadataUtils.getKey(scheduleState));
+    }
+
+    @Test
+    public void testGetKeyThrowable() {
+        thrown.expect(RuntimeException.class);
+        Object obj = new Object();
+        MetadataUtils.getKey(obj);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
new file mode 100644
index 0000000..7655f54
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/InMemoryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * @since May 1, 2016
+ */
+public class InMemoryTest {
+
+    private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load());
+
+    @Test
+    public void test_AddPolicy() {
+
+        LoggerFactory.getLogger(InMemoryTest.class);
+
+        MetadataDaoFactory.getInstance().getMetadataDao();
+
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setName("pd1");
+        dao.addPolicy(pd);
+
+        Assert.assertEquals(1, dao.listPolicies().size());
+    }
+
+    @Test
+    public void testAddCluster(){
+        StreamingCluster cluster1 = new StreamingCluster();
+        cluster1.setName("test1");
+        StreamingCluster cluster2 = new StreamingCluster();
+        cluster2.setName("test2");
+        StreamingCluster cluster3 = new StreamingCluster();
+        cluster3.setName("test2");
+        OpResult opResult1 = dao.addCluster(cluster1);
+        Assert.assertEquals(OpResult.SUCCESS,opResult1.code);
+        OpResult opResult2 = dao.addCluster(cluster2);
+        Assert.assertEquals(OpResult.SUCCESS,opResult2.code);
+        OpResult opResult3 = dao.addCluster(cluster3);
+        Assert.assertEquals(OpResult.SUCCESS,opResult3.code);
+        Assert.assertTrue(opResult3.message.contains("replace"));
+        dao.clear();
+    }
+
+    @Test
+    public void testRemoveDataSource(){
+        Kafka2TupleMetadata dataSource1 = new Kafka2TupleMetadata();
+        Kafka2TupleMetadata dataSource2 = new Kafka2TupleMetadata();
+        dataSource1.setName("test1");
+        dataSource2.setName("test2");
+        dao.addDataSource(dataSource1);
+        dao.addDataSource(dataSource2);
+        OpResult opResult1 = dao.removeDataSource("test1");
+        Assert.assertEquals(OpResult.SUCCESS, opResult1.code);
+        OpResult opResult2 = dao.removeDataSource("test1");
+        Assert.assertEquals(OpResult.SUCCESS, opResult2.code);
+        Assert.assertTrue(opResult2.message.contains("no configuration"));
+        dao.clear();
+    }
+
+    @Test
+    public void testListAlertPublishEvent(){
+        dao.addAlertPublishEvent(new AlertPublishEvent());
+        dao.addAlertPublishEvent(new AlertPublishEvent());
+        Assert.assertEquals(2,dao.listAlertPublishEvent(5).size());
+    }
+
+    @Test
+    public void testGetAlertPublishEventByPolicyId(){
+        AlertPublishEvent alert1 = new AlertPublishEvent();
+        AlertPublishEvent alert2 = new AlertPublishEvent();
+        alert1.setAlertId("1");
+        alert1.setPolicyId("1");
+        alert2.setAlertId("2");
+        alert2.setPolicyId("1");
+        dao.addAlertPublishEvent(alert1);
+        dao.addAlertPublishEvent(alert2);
+        Assert.assertNotNull(dao.getAlertPublishEvent("1"));
+        Assert.assertEquals(2, dao.getAlertPublishEventByPolicyId("1", 2).size());
+    }
+
+    @Test
+    public void testAddScheduleState(){
+        ScheduleState scheduleState = new ScheduleState();
+        scheduleState.setVersion("1");
+        Assert.assertEquals(OpResult.SUCCESS,dao.addScheduleState(scheduleState).code);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
new file mode 100644
index 0000000..7a2fcb5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/JdbcImplTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.metadata.impl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.MetadataUtils;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
+public class JdbcImplTest {
+    private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
+    static IMetadataDao dao;
+
+    @BeforeClass
+    public static void setup() {
+        System.setProperty("config.resource", "/application-mysql.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+        dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (dao != null) {
+            try {
+                dao.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private String TOPO_NAME = "topoName";
+
+    @Ignore
+    @Test
+    public void test_apis() {
+        // publishment
+        {
+            Publishment publishment = new Publishment();
+            publishment.setName("pub-");
+            OpResult result = dao.addPublishment(publishment);
+            Assert.assertEquals(200, result.code);
+            List<Publishment> assigns = dao.listPublishment();
+            Assert.assertEquals(1, assigns.size());
+            result = dao.removePublishment("pub-");
+            Assert.assertTrue(200 == result.code);
+        }
+        // topology
+        {
+            OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
+            System.out.println(result.message);
+            Assert.assertEquals(200, result.code);
+            List<Topology> topos = dao.listTopologies();
+            Assert.assertEquals(1, topos.size());
+            // add again: replace existing one
+            result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
+            topos = dao.listTopologies();
+            Assert.assertEquals(1, topos.size());
+            Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
+            Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
+        }
+        // assignment
+        {
+            PolicyAssignment assignment = new PolicyAssignment();
+            assignment.setPolicyName("policy1");
+            OpResult result = dao.addAssignment(assignment);
+            Assert.assertEquals(200, result.code);
+            List<PolicyAssignment> assigns = dao.listAssignments();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // cluster
+        {
+            StreamingCluster cluster = new StreamingCluster();
+            cluster.setName("dd");
+            OpResult result = dao.addCluster(cluster);
+            Assert.assertEquals(200, result.code);
+            List<StreamingCluster> assigns = dao.listClusters();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // data source
+        {
+            Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
+            dataSource.setName("ds");
+            OpResult result = dao.addDataSource(dataSource);
+            Assert.assertEquals(200, result.code);
+            List<Kafka2TupleMetadata> assigns = dao.listDataSources();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // policy
+        {
+            PolicyDefinition policy = new PolicyDefinition();
+            policy.setName("ds");
+            OpResult result = dao.addPolicy(policy);
+            Assert.assertEquals(200, result.code);
+            List<PolicyDefinition> assigns = dao.listPolicies();
+            Assert.assertEquals(1, assigns.size());
+        }
+
+        // publishmentType
+        {
+            PublishmentType publishmentType = new PublishmentType();
+            publishmentType.setType("KAFKA");
+            OpResult result = dao.addPublishmentType(publishmentType);
+            Assert.assertEquals(200, result.code);
+            List<PublishmentType> assigns = dao.listPublishmentType();
+            Assert.assertEquals(1, assigns.size());
+        }
+    }
+
+    private void test_addstate() {
+        ScheduleState state = new ScheduleState();
+        String versionId = "state-" + System.currentTimeMillis();
+        state.setVersion(versionId);
+        state.setGenerateTime(String.valueOf(new Date().getTime()));
+        OpResult result = dao.addScheduleState(state);
+        Assert.assertEquals(200, result.code);
+        state = dao.getScheduleState();
+        Assert.assertEquals(state.getVersion(), versionId);
+    }
+
+    @Ignore
+    @Test
+    public void test_readCurrentState() {
+        test_addstate();
+        ScheduleState state = dao.getScheduleState();
+        Assert.assertNotNull(state);
+
+        LOG.debug(state.getVersion());
+        LOG.debug(state.getGenerateTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java
new file mode 100644
index 0000000..3b3ddf9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/alert/metadata/impl/MongoImplTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import de.flapdoodle.embed.mongo.MongodExecutable;
+import de.flapdoodle.embed.mongo.MongodProcess;
+import de.flapdoodle.embed.mongo.MongodStarter;
+import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
+import de.flapdoodle.embed.mongo.config.Net;
+import de.flapdoodle.embed.mongo.distribution.Version;
+import de.flapdoodle.embed.process.runtime.Network;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.eagle.alert.coordination.model.*;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.MetadataUtils;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * @since May 1, 2016
+ */
+public class MongoImplTest {
+    private static Logger LOG = LoggerFactory.getLogger(MongoImplTest.class);
+    static IMetadataDao dao;
+
+    private static MongodExecutable mongodExe;
+    private static MongodProcess mongod;
+
+    public static void before() {
+        try {
+            MongodStarter starter = MongodStarter.getDefaultInstance();
+            mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
+                .net(new Net(27017, Network.localhostIsIPv6())).build());
+            mongod = mongodExe.start();
+        } catch (Exception e) {
+            LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
+        }
+    }
+
+    @BeforeClass
+    public static void setup() {
+        before();
+
+        System.setProperty("config.resource", "/application-mongo.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+        dao = new MongoMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
+
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (mongod != null) {
+            try {
+                mongod.stop();
+            } catch (IllegalStateException e) {
+                // catch this exception for the unstable stopping mongodb
+                // reason: the exception is usually thrown out with below message format when stop() returns null value,
+                //         but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
+                //         the process ultimately
+                if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
+                    // if matches, do nothing, just ignore the exception
+                } else {
+                    LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
+                }
+            }
+            mongodExe.stop();
+        }
+    }
+
+    private String TOPO_NAME = "topoName";
+
+    @Test
+    public void test_apis() throws Exception {
+        // topology
+        {
+            OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
+            System.out.println(result.message);
+            Assert.assertEquals(200, result.code);
+            List<Topology> topos = dao.listTopologies();
+            Assert.assertEquals(1, topos.size());
+
+            result = dao.addTopology(new Topology(TOPO_NAME + "-new", 3, 5));
+            topos = dao.listTopologies();
+            Assert.assertEquals(2, topos.size());
+            // add again: replace existing one
+            result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
+            topos = dao.listTopologies();
+            Assert.assertEquals(2, topos.size());
+            Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
+            Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
+        }
+        // assignment
+        {
+            PolicyAssignment assignment = new PolicyAssignment();
+            assignment.setPolicyName("policy1");
+            OpResult result = dao.addAssignment(assignment);
+            Assert.assertEquals(200, result.code);
+            List<PolicyAssignment> assigns = dao.listAssignments();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // cluster
+        {
+            StreamingCluster cluster = new StreamingCluster();
+            cluster.setName("dd");
+            OpResult result = dao.addCluster(cluster);
+            Assert.assertEquals(200, result.code);
+            List<StreamingCluster> assigns = dao.listClusters();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // data source
+        {
+            Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
+            dataSource.setName("ds");
+            OpResult result = dao.addDataSource(dataSource);
+            Assert.assertEquals(200, result.code);
+            List<Kafka2TupleMetadata> assigns = dao.listDataSources();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // policy
+        {
+            PolicyDefinition policy = new PolicyDefinition();
+            policy.setName("ds");
+            OpResult result = dao.addPolicy(policy);
+            Assert.assertEquals(200, result.code);
+            List<PolicyDefinition> assigns = dao.listPolicies();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // publishment
+        {
+            Publishment publishment = new Publishment();
+            publishment.setName("pub-");
+            OpResult result = dao.addPublishment(publishment);
+            Assert.assertEquals(200, result.code);
+            List<Publishment> assigns = dao.listPublishment();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // publishmentType
+        {
+            PublishmentType publishmentType = new PublishmentType();
+            publishmentType.setType("KAFKA");
+            OpResult result = dao.addPublishmentType(publishmentType);
+            Assert.assertEquals(200, result.code);
+            List<PublishmentType> assigns = dao.listPublishmentType();
+            Assert.assertEquals(1, assigns.size());
+        }
+
+        // schedule state
+        {
+            ScheduleState state = new ScheduleState();
+            state.setVersion("001");
+            state.setScheduleTimeMillis(3000);
+            state.setCode(200);
+            OpResult result = dao.addScheduleState(state);
+            Assert.assertEquals(200, result.code);
+
+            Thread.sleep(1000);
+
+            state = new ScheduleState();
+            state.setScheduleTimeMillis(3000);
+            state.setVersion("002");
+            state.setCode(201);
+            result = dao.addScheduleState(state);
+            Assert.assertEquals(200, result.code);
+
+            ScheduleState getState = dao.getScheduleState();
+            Assert.assertEquals(201, getState.getCode());
+        }
+        // stream
+        {
+            StreamDefinition stream = new StreamDefinition();
+            stream.setStreamId("stream");
+            OpResult result = dao.createStream(stream);
+            Assert.assertEquals(200, result.code);
+            List<StreamDefinition> assigns = dao.listStreams();
+            Assert.assertEquals(1, assigns.size());
+        }
+        // alert
+        {
+            AlertPublishEvent alert = new AlertPublishEvent();
+            alert.setAlertTimestamp(System.currentTimeMillis());
+            alert.setAlertId(UUID.randomUUID().toString());
+            OpResult result = dao.addAlertPublishEvent(alert);
+            Assert.assertEquals(200, result.code);
+            List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2);
+            Assert.assertEquals(1, alerts.size());
+        }
+    }
+
+    private void test_addstate() {
+        ScheduleState state = new ScheduleState();
+        state.setVersion("state-" + System.currentTimeMillis());
+        state.setGenerateTime(String.valueOf(new Date().getTime()));
+        OpResult result = dao.addScheduleState(state);
+        Assert.assertEquals(200, result.code);
+    }
+
+    @Test
+    public void test_readCurrentState() {
+        test_addstate();
+        ScheduleState state = dao.getScheduleState();
+        Assert.assertNotNull(state);
+
+        System.out.println(state.getVersion());
+        System.out.println(state.getGenerateTime());
+    }
+
+    private void test_addCompleteScheduleState() {
+        Long timestamp = System.currentTimeMillis();
+        String version = "state-" + timestamp;
+
+        // SpoutSpec
+        Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>();
+        SpoutSpec spoutSpec1 = new SpoutSpec();
+        String topologyId1 = "testUnitTopology1_" + timestamp;
+        spoutSpec1.setTopologyId(topologyId1);
+
+        Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>();
+        Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+        kafka2TupleMetadata.setType("KAFKA");
+        kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata);
+        spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
+
+        Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap= new HashMap<>();
+        List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new ArrayList<>();
+        StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata();
+        List<StreamRepartitionStrategy> groupingStrategies = new ArrayList();
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        streamRepartitionStrategy.setStartSequence(4);
+        groupingStrategies.add(streamRepartitionStrategy);
+        streamRepartitionMetadata.setGroupingStrategies(groupingStrategies);
+        StreamRepartitionMetadataList.add(streamRepartitionMetadata);
+        streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList);
+        spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
+        spoutSpecsMap.put(topologyId1, spoutSpec1);
+
+        SpoutSpec spoutSpec2 = new SpoutSpec();
+        String topologyId2 = "testUnitTopology2_" + timestamp;
+        spoutSpec2.setTopologyId(topologyId2);
+        spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
+        spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
+        spoutSpecsMap.put(topologyId2, spoutSpec2);
+
+        // Alert Spec
+        Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>();
+        alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1));
+
+        // GroupSpec
+        Map<String, RouterSpec> groupSpecsMap = new HashMap<>();
+        groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1));
+
+        // PublishSpec
+        Map<String, PublishSpec> pubMap = new HashMap<>();
+        pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt"));
+
+        // Policy Snapshots
+        Collection<PolicyDefinition> policySnapshots = new ArrayList<>();
+        PolicyDefinition policy = new PolicyDefinition();
+        policy.setName("testPolicyDefinition");
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
+        def.setType("absencealert");
+        policy.setDefinition(def);
+        policySnapshots.add(policy);
+
+        // Stream Snapshots
+        Collection<StreamDefinition> streams = new ArrayList<>();
+        StreamDefinition stream = new StreamDefinition();
+        stream.setStreamId("testStream");
+        streams.add(stream);
+
+        // Monitored Streams
+        Collection<MonitoredStream> monitoredStreams = new ArrayList<>();
+        StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.GLOBAL);
+        partition.setStreamId("s1");
+        partition.setColumns(Arrays.asList("f1", "f2"));
+        StreamGroup sg = new StreamGroup();
+        sg.addStreamPartition(partition);
+        MonitoredStream monitoredStream = new MonitoredStream(sg);
+        monitoredStreams.add(monitoredStream);
+
+        // Assignments
+        Collection<PolicyAssignment> assignments = new ArrayList<>();
+        assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]" + timestamp));
+
+        ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap,
+            assignments, monitoredStreams, policySnapshots, streams);
+
+        OpResult result = dao.addScheduleState(state);
+        Assert.assertEquals(200, result.code);
+    }
+
+    @Test
+    public void test_readCompleteScheduleState() {
+        test_addCompleteScheduleState();
+
+        ScheduleState state = dao.getScheduleState();
+        Assert.assertNotNull(state);
+        Assert.assertEquals(2, state.getSpoutSpecs().size());
+        Assert.assertEquals(1, state.getAlertSpecs().size());
+        Assert.assertEquals(1, state.getGroupSpecs().size());
+        Assert.assertEquals(1, state.getPublishSpecs().size());
+        Assert.assertEquals(1, state.getPolicySnapshots().size());
+        Assert.assertEquals(1, state.getStreamSnapshots().size());
+        Assert.assertEquals(1, state.getMonitoredStreams().size());
+        Assert.assertEquals(1, state.getAssignments().size());
+
+
+        System.out.println(state.getVersion());
+        System.out.println(state.getGenerateTime());
+
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
deleted file mode 100644
index 840f4a7..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert.resource.impl;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
-import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since May 1, 2016
- */
-public class InMemoryTest {
-
-    private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load());
-
-    @Test
-    public void test_AddPolicy() {
-
-        LoggerFactory.getLogger(InMemoryTest.class);
-
-        MetadataDaoFactory.getInstance().getMetadataDao();
-
-        PolicyDefinition pd = new PolicyDefinition();
-        pd.setName("pd1");
-        dao.addPolicy(pd);
-
-        Assert.assertEquals(1, dao.listPolicies().size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java
deleted file mode 100644
index 158c0c2..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/JdbcImplTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.alert.resource.impl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.impl.JdbcMetadataDaoImpl;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-public class JdbcImplTest {
-    private static Logger LOG = LoggerFactory.getLogger(JdbcImplTest.class);
-    static IMetadataDao dao;
-
-    @BeforeClass
-    public static void setup() {
-        System.setProperty("config.resource", "/application-mysql.conf");
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-        dao = new JdbcMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
-    }
-
-    @AfterClass
-    public static void teardown() {
-        if (dao != null) {
-            try {
-                dao.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    private String TOPO_NAME = "topoName";
-
-    @Ignore
-    @Test
-    public void test_apis() {
-        // publishment
-        {
-            Publishment publishment = new Publishment();
-            publishment.setName("pub-");
-            OpResult result = dao.addPublishment(publishment);
-            Assert.assertEquals(200, result.code);
-            List<Publishment> assigns = dao.listPublishment();
-            Assert.assertEquals(1, assigns.size());
-            result = dao.removePublishment("pub-");
-            Assert.assertTrue(200 == result.code);
-        }
-        // topology
-        {
-            OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
-            System.out.println(result.message);
-            Assert.assertEquals(200, result.code);
-            List<Topology> topos = dao.listTopologies();
-            Assert.assertEquals(1, topos.size());
-            // add again: replace existing one
-            result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
-            topos = dao.listTopologies();
-            Assert.assertEquals(1, topos.size());
-            Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
-            Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
-        }
-        // assignment
-        {
-            PolicyAssignment assignment = new PolicyAssignment();
-            assignment.setPolicyName("policy1");
-            OpResult result = dao.addAssignment(assignment);
-            Assert.assertEquals(200, result.code);
-            List<PolicyAssignment> assigns = dao.listAssignments();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // cluster
-        {
-            StreamingCluster cluster = new StreamingCluster();
-            cluster.setName("dd");
-            OpResult result = dao.addCluster(cluster);
-            Assert.assertEquals(200, result.code);
-            List<StreamingCluster> assigns = dao.listClusters();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // data source
-        {
-            Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
-            dataSource.setName("ds");
-            OpResult result = dao.addDataSource(dataSource);
-            Assert.assertEquals(200, result.code);
-            List<Kafka2TupleMetadata> assigns = dao.listDataSources();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // policy
-        {
-            PolicyDefinition policy = new PolicyDefinition();
-            policy.setName("ds");
-            OpResult result = dao.addPolicy(policy);
-            Assert.assertEquals(200, result.code);
-            List<PolicyDefinition> assigns = dao.listPolicies();
-            Assert.assertEquals(1, assigns.size());
-        }
-
-        // publishmentType
-        {
-            PublishmentType publishmentType = new PublishmentType();
-            publishmentType.setType("KAFKA");
-            OpResult result = dao.addPublishmentType(publishmentType);
-            Assert.assertEquals(200, result.code);
-            List<PublishmentType> assigns = dao.listPublishmentType();
-            Assert.assertEquals(1, assigns.size());
-        }
-    }
-
-    private void test_addstate() {
-        ScheduleState state = new ScheduleState();
-        String versionId = "state-" + System.currentTimeMillis();
-        state.setVersion(versionId);
-        state.setGenerateTime(String.valueOf(new Date().getTime()));
-        OpResult result = dao.addScheduleState(state);
-        Assert.assertEquals(200, result.code);
-        state = dao.getScheduleState();
-        Assert.assertEquals(state.getVersion(), versionId);
-    }
-
-    @Ignore
-    @Test
-    public void test_readCurrentState() {
-        test_addstate();
-        ScheduleState state = dao.getScheduleState();
-        Assert.assertNotNull(state);
-
-        LOG.debug(state.getVersion());
-        LOG.debug(state.getGenerateTime());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8d7f81e1/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
deleted file mode 100644
index 4328be3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/MongoImplTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert.resource.impl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import de.flapdoodle.embed.mongo.MongodExecutable;
-import de.flapdoodle.embed.mongo.MongodProcess;
-import de.flapdoodle.embed.mongo.MongodStarter;
-import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
-import de.flapdoodle.embed.mongo.config.Net;
-import de.flapdoodle.embed.mongo.distribution.Version;
-import de.flapdoodle.embed.process.runtime.Network;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.eagle.alert.coordination.model.*;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.*;
-import org.apache.eagle.alert.engine.model.AlertPublishEvent;
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.MetadataUtils;
-import org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * @since May 1, 2016
- */
-public class MongoImplTest {
-    private static Logger LOG = LoggerFactory.getLogger(MongoImplTest.class);
-    static IMetadataDao dao;
-
-    private static MongodExecutable mongodExe;
-    private static MongodProcess mongod;
-
-    public static void before() {
-        try {
-            MongodStarter starter = MongodStarter.getDefaultInstance();
-            mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1)
-                .net(new Net(27017, Network.localhostIsIPv6())).build());
-            mongod = mongodExe.start();
-        } catch (Exception e) {
-            LOG.error("start embed mongod failed, assume some external mongo running. continue run test!", e);
-        }
-    }
-
-    @BeforeClass
-    public static void setup() {
-        before();
-
-        System.setProperty("config.resource", "/application-mongo.conf");
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-        dao = new MongoMetadataDaoImpl(config.getConfig(MetadataUtils.META_DATA));
-
-    }
-
-    @AfterClass
-    public static void teardown() {
-        if (mongod != null) {
-            try {
-                mongod.stop();
-            } catch (IllegalStateException e) {
-                // catch this exception for the unstable stopping mongodb
-                // reason: the exception is usually thrown out with below message format when stop() returns null value,
-                //         but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying
-                //         the process ultimately
-                if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) {
-                    // if matches, do nothing, just ignore the exception
-                } else {
-                    LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e)));
-                }
-            }
-            mongodExe.stop();
-        }
-    }
-
-    private String TOPO_NAME = "topoName";
-
-    @Test
-    public void test_apis() throws Exception {
-        // topology
-        {
-            OpResult result = dao.addTopology(new Topology(TOPO_NAME, 3, 5));
-            System.out.println(result.message);
-            Assert.assertEquals(200, result.code);
-            List<Topology> topos = dao.listTopologies();
-            Assert.assertEquals(1, topos.size());
-
-            result = dao.addTopology(new Topology(TOPO_NAME + "-new", 3, 5));
-            topos = dao.listTopologies();
-            Assert.assertEquals(2, topos.size());
-            // add again: replace existing one
-            result = dao.addTopology(new Topology(TOPO_NAME, 4, 5));
-            topos = dao.listTopologies();
-            Assert.assertEquals(2, topos.size());
-            Assert.assertEquals(TOPO_NAME, topos.get(0).getName());
-            Assert.assertEquals(4, topos.get(0).getNumOfGroupBolt());
-        }
-        // assignment
-        {
-            PolicyAssignment assignment = new PolicyAssignment();
-            assignment.setPolicyName("policy1");
-            OpResult result = dao.addAssignment(assignment);
-            Assert.assertEquals(200, result.code);
-            List<PolicyAssignment> assigns = dao.listAssignments();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // cluster
-        {
-            StreamingCluster cluster = new StreamingCluster();
-            cluster.setName("dd");
-            OpResult result = dao.addCluster(cluster);
-            Assert.assertEquals(200, result.code);
-            List<StreamingCluster> assigns = dao.listClusters();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // data source
-        {
-            Kafka2TupleMetadata dataSource = new Kafka2TupleMetadata();
-            dataSource.setName("ds");
-            OpResult result = dao.addDataSource(dataSource);
-            Assert.assertEquals(200, result.code);
-            List<Kafka2TupleMetadata> assigns = dao.listDataSources();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // policy
-        {
-            PolicyDefinition policy = new PolicyDefinition();
-            policy.setName("ds");
-            OpResult result = dao.addPolicy(policy);
-            Assert.assertEquals(200, result.code);
-            List<PolicyDefinition> assigns = dao.listPolicies();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // publishment
-        {
-            Publishment publishment = new Publishment();
-            publishment.setName("pub-");
-            OpResult result = dao.addPublishment(publishment);
-            Assert.assertEquals(200, result.code);
-            List<Publishment> assigns = dao.listPublishment();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // publishmentType
-        {
-            PublishmentType publishmentType = new PublishmentType();
-            publishmentType.setType("KAFKA");
-            OpResult result = dao.addPublishmentType(publishmentType);
-            Assert.assertEquals(200, result.code);
-            List<PublishmentType> assigns = dao.listPublishmentType();
-            Assert.assertEquals(1, assigns.size());
-        }
-
-        // schedule state
-        {
-            ScheduleState state = new ScheduleState();
-            state.setVersion("001");
-            state.setScheduleTimeMillis(3000);
-            state.setCode(200);
-            OpResult result = dao.addScheduleState(state);
-            Assert.assertEquals(200, result.code);
-
-            Thread.sleep(1000);
-
-            state = new ScheduleState();
-            state.setScheduleTimeMillis(3000);
-            state.setVersion("002");
-            state.setCode(201);
-            result = dao.addScheduleState(state);
-            Assert.assertEquals(200, result.code);
-
-            ScheduleState getState = dao.getScheduleState();
-            Assert.assertEquals(201, getState.getCode());
-        }
-        // stream
-        {
-            StreamDefinition stream = new StreamDefinition();
-            stream.setStreamId("stream");
-            OpResult result = dao.createStream(stream);
-            Assert.assertEquals(200, result.code);
-            List<StreamDefinition> assigns = dao.listStreams();
-            Assert.assertEquals(1, assigns.size());
-        }
-        // alert
-        {
-            AlertPublishEvent alert = new AlertPublishEvent();
-            alert.setAlertTimestamp(System.currentTimeMillis());
-            alert.setAlertId(UUID.randomUUID().toString());
-            OpResult result = dao.addAlertPublishEvent(alert);
-            Assert.assertEquals(200, result.code);
-            List<AlertPublishEvent> alerts = dao.listAlertPublishEvent(2);
-            Assert.assertEquals(1, alerts.size());
-        }
-    }
-
-    private void test_addstate() {
-        ScheduleState state = new ScheduleState();
-        state.setVersion("state-" + System.currentTimeMillis());
-        state.setGenerateTime(String.valueOf(new Date().getTime()));
-        OpResult result = dao.addScheduleState(state);
-        Assert.assertEquals(200, result.code);
-    }
-
-    @Test
-    public void test_readCurrentState() {
-        test_addstate();
-        ScheduleState state = dao.getScheduleState();
-        Assert.assertNotNull(state);
-
-        System.out.println(state.getVersion());
-        System.out.println(state.getGenerateTime());
-    }
-
-    private void test_addCompleteScheduleState() {
-        Long timestamp = System.currentTimeMillis();
-        String version = "state-" + timestamp;
-
-        // SpoutSpec
-        Map<String, SpoutSpec> spoutSpecsMap = new HashMap<>();
-        SpoutSpec spoutSpec1 = new SpoutSpec();
-        String topologyId1 = "testUnitTopology1_" + timestamp;
-        spoutSpec1.setTopologyId(topologyId1);
-
-        Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<>();
-        Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
-        kafka2TupleMetadata.setType("KAFKA");
-        kafka2TupleMetadataMap.put("preprocess.network-sherlock.events", kafka2TupleMetadata);
-        spoutSpec1.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
-
-        Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap= new HashMap<>();
-        List<StreamRepartitionMetadata> StreamRepartitionMetadataList = new ArrayList<>();
-        StreamRepartitionMetadata streamRepartitionMetadata = new StreamRepartitionMetadata();
-        List<StreamRepartitionStrategy> groupingStrategies = new ArrayList();
-        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
-        streamRepartitionStrategy.setStartSequence(4);
-        groupingStrategies.add(streamRepartitionStrategy);
-        streamRepartitionMetadata.setGroupingStrategies(groupingStrategies);
-        StreamRepartitionMetadataList.add(streamRepartitionMetadata);
-        streamRepartitionMetadataMap.put("preprocess.network-nervecenter.events", StreamRepartitionMetadataList);
-        spoutSpec1.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
-        spoutSpecsMap.put(topologyId1, spoutSpec1);
-
-        SpoutSpec spoutSpec2 = new SpoutSpec();
-        String topologyId2 = "testUnitTopology2_" + timestamp;
-        spoutSpec2.setTopologyId(topologyId2);
-        spoutSpec2.setKafka2TupleMetadataMap(kafka2TupleMetadataMap);
-        spoutSpec2.setStreamRepartitionMetadataMap(streamRepartitionMetadataMap);
-        spoutSpecsMap.put(topologyId2, spoutSpec2);
-
-        // Alert Spec
-        Map<String, AlertBoltSpec> alertSpecsMap = new HashMap<>();
-        alertSpecsMap.put(topologyId1, new AlertBoltSpec(topologyId1));
-
-        // GroupSpec
-        Map<String, RouterSpec> groupSpecsMap = new HashMap<>();
-        groupSpecsMap.put(topologyId1, new RouterSpec(topologyId1));
-
-        // PublishSpec
-        Map<String, PublishSpec> pubMap = new HashMap<>();
-        pubMap.put(topologyId1, new PublishSpec(topologyId1, "testPublishBolt"));
-
-        // Policy Snapshots
-        Collection<PolicyDefinition> policySnapshots = new ArrayList<>();
-        PolicyDefinition policy = new PolicyDefinition();
-        policy.setName("testPolicyDefinition");
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
-        def.setType("absencealert");
-        policy.setDefinition(def);
-        policySnapshots.add(policy);
-
-        // Stream Snapshots
-        Collection<StreamDefinition> streams = new ArrayList<>();
-        StreamDefinition stream = new StreamDefinition();
-        stream.setStreamId("testStream");
-        streams.add(stream);
-
-        // Monitored Streams
-        Collection<MonitoredStream> monitoredStreams = new ArrayList<>();
-        StreamPartition partition = new StreamPartition();
-        partition.setType(StreamPartition.Type.GLOBAL);
-        partition.setStreamId("s1");
-        partition.setColumns(Arrays.asList("f1", "f2"));
-        StreamGroup sg = new StreamGroup();
-        sg.addStreamPartition(partition);
-        MonitoredStream monitoredStream = new MonitoredStream(sg);
-        monitoredStreams.add(monitoredStream);
-
-        // Assignments
-        Collection<PolicyAssignment> assignments = new ArrayList<>();
-        assignments.add(new PolicyAssignment("syslog_regex", "SG[syslog_stream-]" + timestamp));
-
-        ScheduleState state = new ScheduleState(version, spoutSpecsMap, groupSpecsMap, alertSpecsMap, pubMap,
-            assignments, monitoredStreams, policySnapshots, streams);
-
-        OpResult result = dao.addScheduleState(state);
-        Assert.assertEquals(200, result.code);
-    }
-
-    @Test
-    public void test_readCompleteScheduleState() {
-        test_addCompleteScheduleState();
-
-        ScheduleState state = dao.getScheduleState();
-        Assert.assertNotNull(state);
-        Assert.assertEquals(2, state.getSpoutSpecs().size());
-        Assert.assertEquals(1, state.getAlertSpecs().size());
-        Assert.assertEquals(1, state.getGroupSpecs().size());
-        Assert.assertEquals(1, state.getPublishSpecs().size());
-        Assert.assertEquals(1, state.getPolicySnapshots().size());
-        Assert.assertEquals(1, state.getStreamSnapshots().size());
-        Assert.assertEquals(1, state.getMonitoredStreams().size());
-        Assert.assertEquals(1, state.getAssignments().size());
-
-
-        System.out.println(state.getVersion());
-        System.out.println(state.getGenerateTime());
-
-
-    }
-}


Mime
View raw message