eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [02/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java
new file mode 100644
index 0000000..437068f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.topology.resource;
+
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.apache.eagle.service.topology.resource.impl.TopologyMgmtResourceImpl;
+import org.apache.eagle.service.topology.resource.impl.TopologyStatus;
+
+import javax.ws.rs.*;
+
+import java.util.List;
+
+/**
+ * @since May 5, 2016
+ *
+ */
+@Path("/alert")
+@Produces("application/json")
+@Consumes("application/json")
+public class TopologyMgmtResource {
+    private TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl();
+
+    @POST
+    @Path("/topologies/{topologyName}/start")
+    public OpResult startTopology(@PathParam("topologyName") String topologyName) {
+        OpResult result = new OpResult();
+        try {
+            topologyManager.startTopology(topologyName);
+        } catch (Exception ex) {
+            result.message = ex.toString();
+        }
+        return result;
+    }
+
+    @POST
+    @Path("/topologies/{topologyName}/stop")
+    public OpResult stopTopology(@PathParam("topologyName") String topologyName) {
+        OpResult result = new OpResult();
+        try {
+            topologyManager.stopTopology(topologyName);
+        } catch (Exception ex) {
+            result.message = ex.toString();
+        }
+        return result;
+    }
+
+    @GET
+    @Path("/topologies")
+    public List<TopologyStatus> getTopologies() throws Exception {
+        return topologyManager.getTopologies();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java
new file mode 100644
index 0000000..3876116
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.topology.resource.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Optional;
+
+public class TopologyMgmtResourceHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceHelper.class);
+
+    public static <T> Optional<T> findById(List<T> clusters, String id) {
+        Optional<T> OptionValue = clusters.stream().filter(o -> getName(o).equalsIgnoreCase(id)).findFirst();
+        return OptionValue;
+    }
+
+    public static <T> String getName(T t) {
+        try {
+            Method m = t.getClass().getMethod("getName");
+            return (String) m.invoke(t);
+        } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException
+                | IllegalArgumentException e) {
+            LOG.error(" getName not found on given class :" + t.getClass().getName());
+        }
+        throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass()
+                .getName()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
new file mode 100644
index 0000000..ff9a65a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.topology.resource.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
+import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.TopologySummary;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.ConfigFactory;
+
+public class TopologyMgmtResourceImpl {
+    private static final IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class);
+
+    private final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com";
+    private final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627;
+    private final String STORM_JAR_PATH = "topology.stormJarPath";
+
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private Map getStormConf(List<StreamingCluster> clusters, String clusterId) throws Exception {
+        Map<String, Object> storm_conf = Utils.readStormConfig();
+        if(clusterId == null) {
+            storm_conf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST);
+            storm_conf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT);
+        } else {
+            if(clusters == null) clusters = dao.listClusters();
+            Optional<StreamingCluster> scOp = TopologyMgmtResourceHelper.findById(clusters, clusterId);
+            StreamingCluster cluster;
+            if(scOp.isPresent()) {
+                cluster = scOp.get();
+            } else {
+                throw new Exception("Fail to find cluster: " + clusterId);
+            }
+            storm_conf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST));
+            storm_conf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT)));
+        }
+        return storm_conf;
+    }
+
+    private void createTopologyHelper(Topology topologyDef, com.typesafe.config.Config config) {
+        int numOfSpoutTasks = config.getInt(UnitTopologyRunner.SPOUT_TASK_NUM);
+        int numOfRouterBolts = config.getInt(UnitTopologyRunner.ROUTER_TASK_NUM);
+        int numOfAlertBolts = config.getInt(UnitTopologyRunner.ALERT_TASK_NUM);
+        int numOfPublishTasks = config.getInt(UnitTopologyRunner.PUBLISH_TASK_NUM);
+        topologyDef.setSpoutId(UnitTopologyRunner.spoutName);
+        topologyDef.setPubBoltId(UnitTopologyRunner.alertPublishBoltName);
+        topologyDef.setNumOfSpout(numOfSpoutTasks);
+        topologyDef.setNumOfGroupBolt(numOfRouterBolts);
+        topologyDef.setNumOfAlertBolt(numOfAlertBolts);
+        topologyDef.setNumOfPublishBolt(numOfPublishTasks);
+        dao.addTopology(topologyDef);
+    }
+
+    private StormTopology createTopology(Topology topologyDef) {
+        com.typesafe.config.Config topologyConf = ConfigFactory.load("topology-sample-definition.conf");
+        String stormJarPath = "";
+        if(topologyConf.hasPath(STORM_JAR_PATH)) {
+            stormJarPath = topologyConf.getString(STORM_JAR_PATH);
+        }
+        System.setProperty("storm.jar", stormJarPath);
+        createTopologyHelper(topologyDef, topologyConf);
+        return UnitTopologyMain.createTopology(topologyConf);
+    }
+
+    public void startTopology(String topologyName) throws Exception {
+        Optional<Topology> tdop = TopologyMgmtResourceHelper.findById(dao.listTopologies(), topologyName);
+        Topology topologyDef;
+        if(tdop.isPresent()) {
+            topologyDef = tdop.get();
+        } else {
+            topologyDef = new Topology();
+            topologyDef.setName(topologyName);
+        }
+        StormSubmitter.submitTopology(topologyName, getStormConf(null, topologyDef.getClusterName()), createTopology(topologyDef));
+    }
+
+    public void stopTopology(String topologyName) throws Exception {
+        Optional<Topology> tdop = TopologyMgmtResourceHelper.findById(dao.listTopologies(), topologyName);
+        Topology topologyDef;
+        if(tdop.isPresent()) {
+            topologyDef = tdop.get();
+        } else {
+            throw new Exception("Fail to find topology " + topologyName);
+        }
+        Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConf(null, topologyDef.getClusterName())).getClient();
+        stormClient.killTopology(topologyName);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unused" })
+    private TopologySummary getTopologySummery(List<StreamingCluster> clusters, Topology topologyDef) throws Exception {
+        Map storm_conf = getStormConf(clusters, topologyDef.getClusterName());
+        Nimbus.Client stormClient = NimbusClient.getConfiguredClient(storm_conf).getClient();
+        Optional<TopologySummary> tOp = stormClient.getClusterInfo().get_topologies().stream().filter(topology -> topology.get_name().equalsIgnoreCase(topologyDef.getName())).findFirst();
+        if(tOp.isPresent()) {
+            String id = tOp.get().get_id();
+            //StormTopology stormTopology= stormClient.getTopology(id);
+            return tOp.get();
+        } else {
+            return null;
+        }
+    }
+
+    public List<TopologyStatus> getTopologies() throws Exception {
+        List<Topology> topologyDefinitions = dao.listTopologies();
+        List<StreamingCluster> clusters = dao.listClusters();
+
+        List<TopologyStatus> topologies = new ArrayList<>();
+        for(Topology topologyDef : topologyDefinitions) {
+            TopologySummary topologySummary = getTopologySummery(clusters, topologyDef);
+            if(topologySummary != null) {
+                TopologyStatus t = new TopologyStatus();
+                t.setName(topologySummary.get_name());
+                t.setId(topologySummary.get_id());
+                t.setState(topologySummary.get_status());
+                t.setTopology(topologyDef);
+                topologies.add(t);
+            }
+        }
+        return topologies;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java
new file mode 100644
index 0000000..c3381d4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.topology.resource.impl;
+
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TopologyStatus {
+    private String name;
+    private String id;
+    private String state;
+    private Topology topology;
+
+    private Map<String, Double> spoutLoad = new HashMap<>();
+    private Map<String, Double> boltLoad = new HashMap<>();
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public Map<String, Double> getSpoutLoad() {
+        return spoutLoad;
+    }
+
+    public void setSpoutLoad(Map<String, Double> spoutLoad) {
+        this.spoutLoad = spoutLoad;
+    }
+
+    public Map<String, Double> getBoltLoad() {
+        return boltLoad;
+    }
+
+    public void setBoltLoad(Map<String, Double> boltLoad) {
+        this.boltLoad = boltLoad;
+    }
+
+    public Topology getTopology() {
+        return topology;
+    }
+
+    public void setTopology(Topology topology) {
+        this.topology = topology;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
new file mode 100644
index 0000000..20461c8
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
@@ -0,0 +1,6 @@
+{
+	"datastore": {
+		"metadataDao": "org.apache.eagle.service.alert.resource.impl.InMemMetadataDaoImpl",
+		"connection": "localhost:27017"
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..ab7b019
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more 
+	~ contributor license agreements. See the NOTICE file distributed with ~ 
+	this work for additional information regarding copyright ownership. ~ The 
+	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the 
+	"License"); you may not use this file except in compliance with ~ the License. 
+	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 
+	~ ~ Unless required by applicable law or agreed to in writing, software ~ 
+	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the 
+	License for the specific language governing permissions and ~ limitations 
+	under the License. -->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
+		  http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+	version="3.0">
+	<welcome-file-list>
+		<welcome-file>index.html</welcome-file>
+	</welcome-file-list>
+	<servlet>
+		<servlet-name>Jersey Web Application</servlet-name>
+		<servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+		<init-param>
+			<param-name>com.sun.jersey.config.property.packages</param-name>
+			<param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.service,org.codehaus.jackson.jaxrs</param-value>
+		</init-param>
+		<init-param>
+			<param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+			<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
+		</init-param>
+		<init-param>
+			<param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
+			<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
+		</init-param>
+		<load-on-startup>1</load-on-startup>
+	</servlet>
+	<!-- Servlet for swagger initialization only, no URL mapping. -->
+	<servlet>
+		<servlet-name>swaggerConfig</servlet-name>
+		<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
+		<init-param>
+			<param-name>api.version</param-name>
+			<param-value>1.0.0</param-value>
+		</init-param>
+		<init-param>
+			<param-name>swagger.api.basepath</param-name>
+			<param-value>/api</param-value>
+		</init-param>
+		<load-on-startup>2</load-on-startup>
+	</servlet>
+
+	<servlet-mapping>
+		<servlet-name>Jersey Web Application</servlet-name>
+		<url-pattern>/api/*</url-pattern>
+	</servlet-mapping>
+	<filter>
+		<filter-name>CorsFilter</filter-name>
+		<!-- Ideally, should be tomcat embed core's CORSFilter. See @SimpleCORSFiler comments. -->
+		<filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
+		<init-param>
+			<param-name>cors.allowed.origins</param-name>
+			<param-value>*</param-value>
+		</init-param>
+		<init-param>
+			<param-name>cors.allowed.headers</param-name>
+			<param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
+		</init-param>
+		<init-param>
+			<param-name>cors.allowed.methods</param-name>
+			<param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
+		</init-param>
+		<init-param>
+			<param-name>cors.support.credentials</param-name>
+			<param-value>true</param-value>
+		</init-param>
+	</filter>
+	<filter-mapping>
+		<filter-name>CorsFilter</filter-name>
+		<url-pattern>/*</url-pattern>
+	</filter-mapping>
+</web-app>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
new file mode 100644
index 0000000..5da5b32
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
@@ -0,0 +1,18 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+Hello, this is UMP alert metadata service. You are welcome!
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
new file mode 100644
index 0000000..e46213e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.topology.resource.impl;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+
+public class TopologyMgmtResourceImplTest {
+    TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl();
+    String topologyName = "testStartTopology";
+
+    @Ignore
+    @Test
+    public void testStartTopology() throws Exception {
+        topologyManager.startTopology(topologyName);
+        Thread.sleep(10000);
+    }
+
+    @Ignore
+    @Test
+    public void testStopTopology() throws Exception {
+        topologyManager.startTopology(topologyName);
+        Thread.sleep(10000);
+        topologyManager.stopTopology(topologyName);
+    }
+
+    @Ignore
+    @Test
+    public void testGetTopologies() throws Exception {
+        topologyManager.startTopology(topologyName);
+        Thread.sleep(10000);
+        List<TopologyStatus> topologies = topologyManager.getTopologies();
+        Assert.assertTrue(topologies.size() == 1);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore
new file mode 100644
index 0000000..1dd3331
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore
@@ -0,0 +1,2 @@
+/target/
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml
new file mode 100644
index 0000000..3789dcd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one 
+	or more ~ * contributor license agreements. See the NOTICE file distributed 
+	with ~ * this work for additional information regarding copyright ownership. 
+	~ * The ASF licenses this file to You under the Apache License, Version 2.0 
+	~ * (the "License"); you may not use this file except in compliance with 
+	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0 
+	~ * ~ * Unless required by applicable law or agreed to in writing, software 
+	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ * 
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+	~ * See the License for the specific language governing permissions and ~ 
+	* limitations under the License. ~ */ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>io.sherlock</groupId>
+		<artifactId>alert-metadata-parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>alert-metadata</artifactId>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<!-- Storm depends on org.ow2.asm:asm:4.0 -->
+		<!-- Jersey depends on asm:asm:3.0 -->
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>io.sherlock</groupId>
+			<artifactId>alert-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.mongodb</groupId>
+			<artifactId>mongo-java-driver</artifactId>
+			<version>${mongodb.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>de.flapdoodle.embed</groupId>
+			<artifactId>de.flapdoodle.embed.mongo</artifactId>
+			<version>1.50.5</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
new file mode 100644
index 0000000..1773042
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.apache.eagle.alert.metadata.resource.Models;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * In memory service for simple service start. Make all service API as
+ * synchronized.
+ * 
+ * @since Apr 11, 2016
+ *
+ */
+public class InMemMetadataDaoImpl implements IMetadataDao {
+
+    private static final Logger LOG = LoggerFactory.getLogger(InMemMetadataDaoImpl.class);
+
+    private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
+    private List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
+    private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
+    private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
+    private List<Publishment> publishments = new ArrayList<Publishment>();
+    private List<PublishmentType> publishmentTypes = new ArrayList<PublishmentType>();
+    private volatile int maxScheduleState = 100;
+    private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
+    private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
+    private List<Topology> topologies = new ArrayList<Topology>();
+    
+    public InMemMetadataDaoImpl(Config config) {
+    }
+
+    @Override
+    public synchronized List<StreamingCluster> listClusters() {
+        return clusters;
+    }
+
+    @Override
+    public OpResult addCluster(final StreamingCluster cluster) {
+        return addOrReplace(clusters, cluster);
+    }
+
+    private synchronized <T> OpResult addOrReplace(List<T> clusters, T paramT) {
+        Optional<T> scOp = clusters.stream().filter(new Predicate<T>() {
+            @Override
+            public boolean test(T t) {
+                if (getKey(t).equalsIgnoreCase(getKey(paramT))) {
+                    return true;
+                }
+                return false;
+            }
+        }).findFirst();
+
+        OpResult result = new OpResult();
+        // replace
+        if (scOp.isPresent()) {
+            clusters.remove(scOp.get());
+            result.message = "replace the old one!";
+        } else {
+            result.message = "created new config!";
+        }
+        result.code = 200;
+        clusters.add(paramT);
+        return result;
+    }
+
+    public static <T> String getKey(T t) {
+        if (t instanceof StreamDefinition) {
+            return ((StreamDefinition) t).getStreamId();
+        } else if (t instanceof PublishmentType) {
+            return ((PublishmentType) t).getType();
+        }
+
+        try {
+            Method m = t.getClass().getMethod("getName");
+            return (String) m.invoke(t);
+        } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException
+                | IllegalArgumentException e) {
+            LOG.error(" getName not found on given class :" + t.getClass().getName());
+        }
+        throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass()
+                .getName()));
+    }
+
+    @SuppressWarnings("unchecked")
+    private synchronized <T> OpResult remove(List<T> clusters, String id) {
+        T[] matched = (T[]) clusters.stream().filter(new Predicate<T>() {
+
+            @Override
+            public boolean test(T t) {
+                if (getKey(t).equalsIgnoreCase(id)) {
+                    return true;
+                }
+                return false;
+            }
+        }).toArray();
+
+        OpResult result = new OpResult();
+        result.code = 200;
+        if (clusters.removeAll(Arrays.asList(matched))) {
+            result.message = "removed configuration item succeed";
+        } else {
+            result.message = "no configuration item removed";
+        }
+        return result;
+    }
+
+    @Override
+    public OpResult removeCluster(final String clusterId) {
+        return remove(clusters, clusterId);
+    }
+
+    @Override
+    public synchronized List<StreamDefinition> listStreams() {
+        return schemas;
+    }
+
+    @Override
+    public OpResult createStream(StreamDefinition stream) {
+        return addOrReplace(schemas, stream);
+    }
+
+    @Override
+    public OpResult removeStream(String streamId) {
+        return remove(schemas, streamId);
+    }
+
+    @Override
+    public synchronized List<Kafka2TupleMetadata> listDataSources() {
+        return datasources;
+    }
+
+    @Override
+    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
+        return addOrReplace(datasources, dataSource);
+    }
+
+    @Override
+    public OpResult removeDataSource(String datasourceId) {
+        return remove(datasources, datasourceId);
+    }
+
+    @Override
+    public synchronized List<PolicyDefinition> listPolicies() {
+        return policies;
+    }
+
+    @Override
+    public OpResult addPolicy(PolicyDefinition policy) {
+        return addOrReplace(policies, policy);
+    }
+
+    @Override
+    public OpResult removePolicy(String policyId) {
+        return remove(policies, policyId);
+    }
+
+    @Override
+    public synchronized List<Publishment> listPublishment() {
+        return publishments;
+    }
+
+    @Override
+    public OpResult addPublishment(Publishment publishment) {
+        return addOrReplace(publishments, publishment);
+    }
+
+    @Override
+    public OpResult removePublishment(String pubId) {
+        return remove(publishments, pubId);
+    }
+
+    @Override
+    public List<PublishmentType> listPublishmentType() {
+        return publishmentTypes;
+    }
+
+    @Override
+    public OpResult addPublishmentType(PublishmentType publishmentType) {
+        return addOrReplace(publishmentTypes, publishmentType);
+    }
+
+    @Override
+    public OpResult removePublishmentType(String pubType) {
+        return remove(publishmentTypes, pubType);
+    }
+
+    @Override
+    public ScheduleState getScheduleState(String versionId) {
+        return scheduleStates.get(versionId);
+    }
+
+    @Override
+    public OpResult addScheduleState(ScheduleState state) {
+        // FIXME : might concurrent issue
+        String toRemove = null;
+        if (scheduleStates.size() > maxScheduleState) {
+            toRemove = scheduleStates.firstKey();
+        }
+        scheduleStates.put(state.getVersion(), state);
+        if (toRemove != null) {
+            scheduleStates.remove(toRemove);
+        }
+
+        OpResult result = new OpResult();
+        result.code = 200;
+        result.message = "OK";
+        return result;
+    }
+
+    @Override
+    public ScheduleState getScheduleState() {
+        if (scheduleStates.size() > 0) {
+            return scheduleStates.get(scheduleStates.lastKey());
+        }
+        return null;
+    }
+
+    @Override
+    public List<PolicyAssignment> listAssignments() {
+        return assignments;
+    }
+
+    @Override
+    public OpResult addAssignment(PolicyAssignment assignment) {
+        OpResult result = new OpResult();
+        result.code = 200;
+        result.message = "OK";
+        assignments.add(assignment);
+        return result;
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return topologies;
+    }
+
+    @Override
+    public OpResult addTopology(Topology t) {
+        return addOrReplace(topologies, t);
+    }
+
+    @Override
+    public OpResult removeTopology(String topologyName) {
+        return remove(topologies, topologyName);
+    }
+
+    @Override
+    public OpResult clear() {
+        this.assignments.clear();
+        this.clusters.clear();
+        this.datasources.clear();
+        this.policies.clear();
+        this.publishments.clear();
+        this.scheduleStates.clear();
+        this.schemas.clear();
+        this.topologies.clear();
+        OpResult result = new OpResult();
+        result.code = 200;
+        result.message = "OK";
+        return result;
+    }
+
+    @Override
+    public Models export() {
+        Models models = new Models();
+        models.assignments.addAll(this.assignments);
+        models.clusters.addAll(this.clusters);
+        models.datasources.addAll(this.datasources);
+        models.policies.addAll(this.policies);
+        models.publishments.addAll(this.publishments);
+        models.scheduleStates.putAll(this.scheduleStates);
+        models.schemas.addAll(this.schemas);
+        models.topologies.addAll(this.topologies);
+        return models;
+    }
+
+    @Override
+    public OpResult importModels(Models models) {
+        clear();
+        this.assignments.addAll(models.assignments);
+        this.clusters.addAll(models.clusters);
+        this.datasources.addAll(models.datasources);
+        this.policies.addAll(models.policies);
+        this.publishments.addAll(models.publishments);
+        this.scheduleStates.putAll(models.scheduleStates);
+        this.schemas.addAll(models.schemas);
+        this.topologies.addAll(models.topologies);
+        OpResult result = new OpResult();
+        result.code = 200;
+        result.message = "OK";
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
new file mode 100644
index 0000000..94717ce
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since Apr 12, 2016
+ *
+ */
+public class MetadataDaoFactory {
+
+    private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
+
+    private IMetadataDao dao;
+
+    private MetadataDaoFactory() {
+        Config config = ConfigFactory.load();
+        Config datastoreConfig = config.getConfig("datastore");
+        if (datastoreConfig == null) {
+            LOG.warn("datastore is not configured, use in-memory store !!!");
+            dao = new InMemMetadataDaoImpl(datastoreConfig);
+        } else {
+            String clsName = datastoreConfig.getString("metadataDao");
+            Class<?> clz;
+            try {
+                clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
+                if (IMetadataDao.class.isAssignableFrom(clz)) {
+                    Constructor<?> cotr = clz.getConstructor(Config.class);
+                    dao = (IMetadataDao) cotr.newInstance(datastoreConfig);
+                } else {
+                    throw new Exception("metadataDao configuration need to be implementation of IMetadataDao! ");
+                }
+            } catch (Exception e) {
+                LOG.error("error when initialize the dao, fall back to in memory mode!", e);
+                dao = new InMemMetadataDaoImpl(datastoreConfig);
+            }
+        }
+    }
+
+    public static MetadataDaoFactory getInstance() {
+        return INSTANCE;
+    }
+
+    public IMetadataDao getMetadataDao() {
+        return dao;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
new file mode 100644
index 0000000..a46f0c7
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.apache.eagle.alert.metadata.resource.Models;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.Function;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.result.DeleteResult;
+import com.mongodb.client.result.UpdateResult;
+import com.typesafe.config.Config;
+
+/**
+ * @since Apr 11, 2016
+ *
+ */
+public class MongoMetadataDaoImpl implements IMetadataDao {
+
+    private static final String DB_NAME = "ump_alert_metadata";
+    private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+    static {
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    private final String connection;
+    private final MongoClient client;
+    
+    private MongoDatabase db;
+    private MongoCollection<Document> cluster;
+    private MongoCollection<Document> schema;
+    private MongoCollection<Document> datasource;
+    private MongoCollection<Document> policy;
+    private MongoCollection<Document> publishment;
+    private MongoCollection<Document> publishmentType;
+    private MongoCollection<Document> scheduleStates;
+    private MongoCollection<Document> assignments;
+    private MongoCollection<Document> topologies;
+
+    public MongoMetadataDaoImpl(Config config) {
+        this.connection = config.getString("connection");
+        this.client = new MongoClient(connection);
+        init();
+    }
+
+    private void init() {
+        db = client.getDatabase(DB_NAME);
+        IndexOptions io = new IndexOptions().background(true).unique(true).name("nameIndex");
+        BsonDocument doc = new BsonDocument();
+        doc.append("name", new BsonInt32(1));
+        cluster = db.getCollection("clusters");
+        cluster.createIndex(doc, io);
+        {
+            BsonDocument doc2 = new BsonDocument();
+            doc2.append("streamId", new BsonInt32(1));
+            schema = db.getCollection("schemas");
+            schema.createIndex(doc2, io);
+        }
+        datasource = db.getCollection("datasources");
+        datasource.createIndex(doc, io);
+        policy = db.getCollection("policies");
+        policy.createIndex(doc, io);
+        publishment = db.getCollection("publishments");
+        publishment.createIndex(doc, io);
+        topologies = db.getCollection("topologies");
+        topologies.createIndex(doc, io);
+
+        publishmentType = db.getCollection("publishmentTypes");
+        {
+            IndexOptions io1 = new IndexOptions().background(true).unique(true).name("pubTypeIndex");
+            BsonDocument doc1 = new BsonDocument();
+            doc1.append("type", new BsonInt32(1));
+            publishmentType.createIndex(doc1, io1);
+        }
+
+        scheduleStates = db.getCollection("schedule_specs");
+        {
+            IndexOptions io1 = new IndexOptions().background(true).unique(true).name("nameIndex");
+            BsonDocument doc1 = new BsonDocument();
+            doc1.append("version", new BsonInt32(1));
+            scheduleStates.createIndex(doc1, io1);
+        }
+
+        assignments = db.getCollection("assignments");
+        {
+            IndexOptions io1 = new IndexOptions().background(true).unique(true).name("policyNameIndex");
+            BsonDocument doc1 = new BsonDocument();
+            doc1.append("policyName", new BsonInt32(1));
+            assignments.createIndex(doc1, io1);
+        }
+    }
+
+    @Override
+    public List<StreamingCluster> listClusters() {
+        return list(cluster, StreamingCluster.class);
+    }
+
+    private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz) {
+        List<T> result = new LinkedList<T>();
+        collection.find().map(new Function<Document, T>() {
+            @Override
+            public T apply(Document t) {
+                String json = t.toJson();
+                try {
+                    return mapper.readValue(json, clz);
+                } catch (IOException e) {
+                    LOG.error("deserialize config item failed!", e);
+                }
+                return null;
+            }
+        }).into(result);
+        return result;
+    }
+
+    private <T> OpResult addOrReplace(MongoCollection<Document> collection, T t) {
+        BsonDocument filter = new BsonDocument();
+        if (t instanceof StreamDefinition) {
+            filter.append("streamId", new BsonString(InMemMetadataDaoImpl.getKey(t)));
+        } else {
+            filter.append("name", new BsonString(InMemMetadataDaoImpl.getKey(t)));
+        }
+
+        String json = "";
+        OpResult result = new OpResult();
+        try {
+            json = mapper.writeValueAsString(t);
+            UpdateOptions options = new UpdateOptions();
+            options.upsert(true);
+            UpdateResult ur = collection.replaceOne(filter, Document.parse(json), options);
+            // FIXME: could based on matched count do better matching...
+            if (ur.getModifiedCount() > 0 || ur.getUpsertedId() != null) {
+                result.code = 200;
+                result.message = String.format("update %d configuration item.", ur.getModifiedCount());
+            } else {
+                result.code = 500;
+                result.message = "no configuration item create/updated.";
+            }
+        } catch (Exception e) {
+            result.code = 500;
+            result.message = e.getMessage();
+            LOG.error("", e);
+        }
+        return result;
+    }
+
+    private <T> OpResult remove(MongoCollection<Document> collection, String name) {
+        BsonDocument filter = new BsonDocument();
+        filter.append("name", new BsonString(name));
+        DeleteResult dr = collection.deleteOne(filter);
+        OpResult result = new OpResult();
+        result.code = 200;
+        result.message = String.format(" %d config item removed!", dr.getDeletedCount());
+        return result;
+    }
+
+    @Override
+    public OpResult addCluster(StreamingCluster cluster) {
+        return addOrReplace(this.cluster, cluster);
+    }
+
+    @Override
+    public OpResult removeCluster(String clusterId) {
+        return remove(cluster, clusterId);
+    }
+
+    @Override
+    public List<StreamDefinition> listStreams() {
+        return list(schema, StreamDefinition.class);
+    }
+
+    @Override
+    public OpResult createStream(StreamDefinition stream) {
+        return addOrReplace(this.schema, stream);
+    }
+
+    @Override
+    public OpResult removeStream(String streamId) {
+        return remove(schema, streamId);
+    }
+
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return list(datasource, Kafka2TupleMetadata.class);
+    }
+
+    @Override
+    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
+        return addOrReplace(this.datasource, dataSource);
+    }
+
+    @Override
+    public OpResult removeDataSource(String datasourceId) {
+        return remove(datasource, datasourceId);
+    }
+
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        return list(policy, PolicyDefinition.class);
+    }
+
+    @Override
+    public OpResult addPolicy(PolicyDefinition policy) {
+        return addOrReplace(this.policy, policy);
+    }
+
+    @Override
+    public OpResult removePolicy(String policyId) {
+        return remove(policy, policyId);
+    }
+
+    @Override
+    public List<Publishment> listPublishment() {
+        return list(publishment, Publishment.class);
+    }
+
+    @Override
+    public OpResult addPublishment(Publishment publishment) {
+        return addOrReplace(this.publishment, publishment);
+    }
+
+    @Override
+    public OpResult removePublishment(String pubId) {
+        return remove(publishment, pubId);
+    }
+
+    @Override
+    public List<PublishmentType> listPublishmentType() {
+        return list(publishmentType, PublishmentType.class);
+    }
+
+    @Override
+    public OpResult addPublishmentType(PublishmentType pubType) {
+        return addOrReplace(this.publishmentType, pubType);
+    }
+
+    @Override
+    public OpResult removePublishmentType(String pubType) {
+        return remove(publishmentType, pubType);
+    }
+
+    @Override
+    public ScheduleState getScheduleState(String versionId) {
+        BsonDocument doc = new BsonDocument();
+        doc.append("version", new BsonString(versionId));
+        ScheduleState state = scheduleStates.find(doc).map(new Function<Document, ScheduleState>() {
+            @Override
+            public ScheduleState apply(Document t) {
+                String json = t.toJson();
+                try {
+                    return mapper.readValue(json, ScheduleState.class);
+                } catch (IOException e) {
+                    LOG.error("deserialize config item failed!", e);
+                }
+                return null;
+            }
+        }).first();
+        return state;
+    }
+
+    @Override
+    public OpResult addScheduleState(ScheduleState state) {
+        return addOne(scheduleStates, state);
+    }
+
+    private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
+        OpResult result = new OpResult();
+        try {
+            String json = mapper.writeValueAsString(t);
+            collection.insertOne(Document.parse(json));
+            result.code = 200;
+            result.message = "add state succeed!";
+        } catch (Exception e) {
+            result.code = 400;
+            result.message = e.getMessage();
+            LOG.error("", e);
+        }
+        return result;
+    }
+
+    @Override
+    public ScheduleState getScheduleState() {
+        BsonDocument sort = new BsonDocument();
+        sort.append("generateTime", new BsonInt32(-1));
+        ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() {
+            @Override
+            public ScheduleState apply(Document t) {
+                String json = t.toJson();
+                try {
+                    return mapper.readValue(json, ScheduleState.class);
+                } catch (IOException e) {
+                    LOG.error("deserialize config item failed!", e);
+                }
+                return null;
+            }
+        }).first();
+        return state;
+    }
+
+    @Override
+    public List<PolicyAssignment> listAssignments() {
+        return list(assignments, PolicyAssignment.class);
+    }
+
+    @Override
+    public OpResult addAssignment(PolicyAssignment assignment) {
+        return addOne(assignments, assignment);
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return list(topologies, Topology.class);
+    }
+
+    @Override
+    public OpResult addTopology(Topology t) {
+        return addOrReplace(this.topologies, t);
+    }
+
+    @Override
+    public OpResult removeTopology(String topologyName) {
+        return remove(topologies, topologyName);
+    }
+
+    @Override
+    public OpResult clear() {
+        throw new UnsupportedOperationException("clear not support!");
+    }
+
+    @Override
+    public Models export() {
+        throw new UnsupportedOperationException("export not support!");
+    }
+
+    @Override
+    public OpResult importModels(Models models) {
+        throw new UnsupportedOperationException("importModels not support!");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/jdbc/JdbcMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/jdbc/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/jdbc/JdbcMetadataDaoImpl.java
new file mode 100644
index 0000000..9ec119a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/jdbc/JdbcMetadataDaoImpl.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.impl.jdbc;
+
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.apache.eagle.alert.metadata.resource.Models;
+
+/**
+ * @since May 26, 2016
+ *
+ */
+public class JdbcMetadataDaoImpl implements IMetadataDao {
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listTopologies()
+     */
+    @Override
+    public List<Topology> listTopologies() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addTopology(org.apache.eagle.alert.coordination.model.internal.Topology)
+     */
+    @Override
+    public OpResult addTopology(Topology t) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#removeTopology(java.lang.String)
+     */
+    @Override
+    public OpResult removeTopology(String topologyName) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listClusters()
+     */
+    @Override
+    public List<StreamingCluster> listClusters() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addCluster(org.apache.eagle.alert.engine.coordinator.StreamingCluster)
+     */
+    @Override
+    public OpResult addCluster(StreamingCluster cluster) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#removeCluster(java.lang.String)
+     */
+    @Override
+    public OpResult removeCluster(String clusterId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listStreams()
+     */
+    @Override
+    public List<StreamDefinition> listStreams() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#createStream(org.apache.eagle.alert.engine.coordinator.StreamDefinition)
+     */
+    @Override
+    public OpResult createStream(StreamDefinition stream) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#removeStream(java.lang.String)
+     */
+    @Override
+    public OpResult removeStream(String streamId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listDataSources()
+     */
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addDataSource(org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata)
+     */
+    @Override
+    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#removeDataSource(java.lang.String)
+     */
+    @Override
+    public OpResult removeDataSource(String datasourceId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listPolicies()
+     */
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addPolicy(org.apache.eagle.alert.engine.coordinator.PolicyDefinition)
+     */
+    @Override
+    public OpResult addPolicy(PolicyDefinition policy) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#removePolicy(java.lang.String)
+     */
+    @Override
+    public OpResult removePolicy(String policyId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listPublishment()
+     */
+    @Override
+    public List<Publishment> listPublishment() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addPublishment(org.apache.eagle.alert.engine.coordinator.Publishment)
+     */
+    @Override
+    public OpResult addPublishment(Publishment publishment) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#removePublishment(java.lang.String)
+     */
+    @Override
+    public OpResult removePublishment(String pubId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#getScheduleState(java.lang.String)
+     */
+    @Override
+    public ScheduleState getScheduleState(String versionId) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#getScheduleState()
+     */
+    @Override
+    public ScheduleState getScheduleState() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addScheduleState(org.apache.eagle.alert.coordination.model.ScheduleState)
+     */
+    @Override
+    public OpResult addScheduleState(ScheduleState state) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#listAssignments()
+     */
+    @Override
+    public List<PolicyAssignment> listAssignments() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#addAssignment(org.apache.eagle.alert.coordination.model.internal.PolicyAssignment)
+     */
+    @Override
+    public OpResult addAssignment(PolicyAssignment assignment) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#clear()
+     */
+    @Override
+    public OpResult clear() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#export()
+     */
+    @Override
+    public Models export() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.eagle.service.alert.resource.IMetadataDao#importModels(org.apache.eagle.service.alert.resource.Models)
+     */
+    @Override
+    public OpResult importModels(Models models) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public List<PublishmentType> listPublishmentType() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public OpResult addPublishmentType(PublishmentType publishmentType) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public OpResult removePublishmentType(String pubType) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/IMetadataDao.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/IMetadataDao.java
new file mode 100644
index 0000000..8eef985
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/IMetadataDao.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.resource;
+
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.*;
+
+public interface IMetadataDao {
+
+    List<Topology> listTopologies();
+
+    OpResult addTopology(Topology t);
+
+    OpResult removeTopology(String topologyName);
+
+    List<StreamingCluster> listClusters();
+
+    OpResult addCluster(StreamingCluster cluster);
+
+    OpResult removeCluster(String clusterId);
+
+    List<StreamDefinition> listStreams();
+
+    OpResult createStream(StreamDefinition stream);
+
+    OpResult removeStream(String streamId);
+
+    List<Kafka2TupleMetadata> listDataSources();
+
+    OpResult addDataSource(Kafka2TupleMetadata dataSource);
+
+    OpResult removeDataSource(String datasourceId);
+
+    List<PolicyDefinition> listPolicies();
+
+    OpResult addPolicy(PolicyDefinition policy);
+
+    OpResult removePolicy(String policyId);
+
+    List<Publishment> listPublishment();
+
+    OpResult addPublishment(Publishment publishment);
+
+    OpResult removePublishment(String pubId);
+
+    List<PublishmentType> listPublishmentType();
+
+    OpResult addPublishmentType(PublishmentType publishmentType);
+
+    OpResult removePublishmentType(String pubType);
+
+    ScheduleState getScheduleState(String versionId);
+
+    ScheduleState getScheduleState();
+
+    OpResult addScheduleState(ScheduleState state);
+
+    List<PolicyAssignment> listAssignments();
+
+    OpResult addAssignment(PolicyAssignment assignment);
+
+    // APIs for test friendly
+    OpResult clear();
+    Models export();
+    OpResult importModels(Models models);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
new file mode 100644
index 0000000..6bb38f1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/Models.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.resource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+
+/**
+ * This models used for metadata export/import to easy of test.
+ * 
+ * @since May 23, 2016
+ *
+ */
+public class Models {
+    public List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
+    public List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
+    public List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
+    public List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
+    public List<Publishment> publishments = new ArrayList<Publishment>();
+    public SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
+    public List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
+    public List<Topology> topologies = new ArrayList<Topology>();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/OpResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/OpResult.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/OpResult.java
new file mode 100644
index 0000000..44f507d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/resource/OpResult.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.resource;
+
+/**
+ * @since Apr 11, 2016
+ *
+ */
+public class OpResult {
+
+    public int code = 200;
+    public String message = "";
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/resources/application.conf
new file mode 100644
index 0000000..efba546
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/resources/application.conf
@@ -0,0 +1,8 @@
+{
+	"datastore": {
+		"metadataDao": "org.apache.eagle.service.alert.resource.impl.InMemMetadataDaoImpl",
+		"connection": "localhost:27017",
+		"properties" : {
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
new file mode 100644
index 0000000..54a2ffc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/test/java/org/apache/eagle/service/alert/resource/impl/InMemoryTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.service.alert.resource.impl;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
+import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 1, 2016
+ *
+ */
+public class InMemoryTest {
+
+    private IMetadataDao dao = new InMemMetadataDaoImpl(ConfigFactory.load());
+
+    @Test
+    public void test_AddPolicy() {
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setName("pd1");
+        dao.addPolicy(pd);
+
+        Assert.assertEquals(1, dao.listPolicies().size());
+    }
+}


Mime
View raw message