eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [16/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
Date Wed, 01 Jun 2016 05:56:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
new file mode 100644
index 0000000..ccdb6f3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/StreamIdConversion.java
@@ -0,0 +1,36 @@
+package org.apache.eagle.alert.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StreamIdConversion {
+    private final static String STREAM_ID_TEMPLATE = "stream_%s_to_%s";
+    private final static String STREAM_ID_NUM_TEMPLATE = "stream_%s";
+    public static String generateStreamIdBetween(String sourceId, String targetId){
+        return String.format(STREAM_ID_TEMPLATE,sourceId,targetId);
+    }
+
+    /**
+     * Hard-coded stream format in stream_${partitionNum}
+     *
+     *
+     * @param partitionNum
+     * @return
+     */
+    public static String generateStreamIdByPartition(int partitionNum){
+        return String.format(STREAM_ID_NUM_TEMPLATE,partitionNum);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
new file mode 100644
index 0000000..782188d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/utils/TimePeriodUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.utils;
+
+import org.joda.time.Period;
+import org.joda.time.Seconds;
+
+import scala.Int;
+
+public class TimePeriodUtils {
+    /**
+     * For example: timestamp stands for time: 1990/01/07 12:45 and period is PT30, then result is 1990/01/07 12:30
+     *
+     * @param seconds
+     * @param period
+     *
+     * @return formatted timestamp
+     */
+    public static long formatSecondsByPeriod(long seconds,Seconds period){
+        return seconds - (seconds % Int.int2long(period.getSeconds()));
+    }
+
+    /**
+     * @param seconds
+     * @param period
+     * @return
+     */
+    public static long formatSecondsByPeriod(long seconds,Period period){
+        return seconds - (seconds % Int.int2long(period.toStandardSeconds().getSeconds()));
+    }
+
+    /**
+     * @param milliseconds
+     * @param period
+     * @return milliseconds
+     */
+    public static long formatMillisecondsByPeriod(long milliseconds,Period period){
+        return formatSecondsByPeriod(milliseconds/1000,period)*1000;
+    }
+
+    public static int getSecondsOfPeriod(Period period){
+        return period.toStandardSeconds().getSeconds();
+    }
+
+    public static int getMillisecondsOfPeriod(Period period){
+        return getSecondsOfPeriod(period) * 1000;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-common/src/main/resources/log4j.properties
new file mode 100644
index 0000000..fb13ad5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=DEBUG, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
new file mode 100644
index 0000000..8123f45
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
@@ -0,0 +1,61 @@
+package org.apache.eagle.alert.config;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class TestConfigBus {
+
+    @Ignore
+    @Test
+    public void testConfigChange() throws Exception {
+        String topic = "spout";
+        ZKConfig config = new ZKConfig();
+        config.zkQuorum = "localhost:2181";
+        config.zkRoot = "/alert";
+        config.zkRetryInterval = 1000;
+        config.zkRetryTimes = 3;
+        config.connectionTimeoutMs = 3000;
+        config.zkSessionTimeoutMs = 10000;
+        ConfigBusProducer producer = new ConfigBusProducer(config);
+        final AtomicBoolean validate = new AtomicBoolean(false);
+        ConfigBusConsumer consumer = new ConfigBusConsumer(config, topic, value -> {
+            validate.set(true);
+            System.out.println("******** get notified of config " + value);
+        });
+        // first change
+        ConfigValue value = new ConfigValue();
+        value.setValueVersionId(false);
+        value.setValue("testvalue1");
+        producer.send(topic, value);
+
+        Thread.sleep(1000);
+
+        // second change
+        value.setValueVersionId(false);
+        value.setValue("testvalue2");
+        producer.send(topic, value);
+        producer.close();
+        Thread.sleep(1000);
+        consumer.close();
+        Assert.assertTrue(validate.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
new file mode 100644
index 0000000..e96ded9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/metric/MetricSystemTest.java
@@ -0,0 +1,78 @@
+package org.apache.eagle.alert.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.metric.sink.KafkaSink;
+import org.apache.eagle.alert.metric.source.JVMMetricSource;
+import org.apache.eagle.alert.metric.source.MetricSource;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.JvmAttributeGaugeSet;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class MetricSystemTest {
+    @Test @Ignore
+    public void testKafkaSink(){
+        KafkaSink sink = new KafkaSink();
+        MetricRegistry registry = new MetricRegistry();
+        registry.registerAll(new JvmAttributeGaugeSet());
+        sink.prepare(ConfigFactory.load().getConfig("metric.sink.kafka"),registry);
+        sink.report();
+        sink.stop();
+    }
+
+    @Test @Ignore
+    public void testMetricSystem() throws InterruptedException {
+        MetricSystem system = MetricSystem.load(ConfigFactory.load());
+        system.register(new JVMMetricSource());
+        system.register(new SampleMetricSource());
+        system.start();
+        system.report();
+        system.stop();
+    }
+
+    private class SampleMetricSource implements MetricSource {
+        private MetricRegistry registry = new MetricRegistry();
+
+        public SampleMetricSource(){
+            registry.register("sample.long", (Gauge<Long>) System::currentTimeMillis);
+            registry.register("sample.map", (Gauge<Map<String, Object>>) () -> new HashMap<String, Object>(){
+                private static final long serialVersionUID = 3948508906655117683L;
+            {
+                put("int",1234);
+                put("str","text");
+                put("bool",true);
+            }});
+        }
+
+        @Override
+        public String name() {
+            return "sampleSource";
+        }
+
+        @Override
+        public MetricRegistry registry() {
+            return registry;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
new file mode 100644
index 0000000..4496b8e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/alert/service/TestMetadataServiceClientImpl.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.service;
+
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestMetadataServiceClientImpl {
+    @SuppressWarnings("resource")
+    @Ignore
+    @Test
+    public void test() throws Exception{
+        MetadataServiceClientImpl impl = new MetadataServiceClientImpl("localhost", 58080, "/api/metadata/policies");
+        List<PolicyDefinition> policies = impl.listPolicies();
+        ObjectMapper mapper = new ObjectMapper();
+        String ret = mapper.writeValueAsString(policies);
+        System.out.println(ret);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java
new file mode 100644
index 0000000..71cce51
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/test/java/org/apache/eagle/correlation/meta/JsonTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.correlation.meta;
+
+import org.junit.Test;
+
+/**
+ * Created on 3/11/16.
+ */
+public class JsonTest {
+
+    @Test
+    public void policyDefTest() {
+        // TODO
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/test/resources/application.conf b/eagle-core/eagle-alert/alert/alert-common/src/test/resources/application.conf
new file mode 100644
index 0000000..1f7bd01
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/test/resources/application.conf
@@ -0,0 +1,17 @@
+{
+	metric {
+		sink {
+			kafka {
+				"topic":"alert_metric_test"
+				"bootstrap.servers": "localhost:9092"
+			}
+			logger {
+				level = "INFO"
+			}
+			elasticsearch {
+        hosts = ["10.64.223.222:9200"]
+        index = "alert_metric_test"
+      }
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/test/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb13ad5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=DEBUG, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/.gitignore b/eagle-core/eagle-alert/alert/alert-coordinator/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/pom.xml b/eagle-core/eagle-alert/alert/alert-coordinator/pom.xml
new file mode 100644
index 0000000..26c743a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>io.sherlock</groupId>
+		<artifactId>alert-parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>alert-coordinator</artifactId>
+	<packaging>war</packaging>
+
+	<name>alert-coordinator</name>
+	<url>http://maven.apache.org</url>
+
+	<dependencies>
+		<dependency>
+			<groupId>io.sherlock</groupId>
+			<artifactId>alert-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-server</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-servlet</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey.contribs</groupId>
+			<artifactId>jersey-multipart</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-mapper-asl</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-jaxrs</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-xc</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.typesafe</groupId>
+			<artifactId>config</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.tomcat.embed</groupId>
+			<artifactId>tomcat-embed-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>io.swagger</groupId>
+			<artifactId>swagger-jaxrs</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-maven-plugin</artifactId>
+				<configuration>
+					<scanIntervalSeconds>5</scanIntervalSeconds>
+					<httpConnector>
+						<port>9090</port>
+					</httpConnector>
+				</configuration>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.6</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
new file mode 100644
index 0000000..ccb4624
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.eagle.alert.config.ConfigBusProducer;
+import org.apache.eagle.alert.config.ConfigValue;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
+import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
+import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since Mar 24, 2016 Coordinator is a standalone java application, which
+ *        listens to policy changes and use schedule algorithm to distribute
+ *        policies 1) reacting to shutdown events 2) start non-daemon thread to
+ *        pull policies and figure out if polices are changed
+ */
+public class Coordinator {
+    private static final String COORDINATOR = "coordinator";
+    /**
+     * {@link ZKMetadataChangeNotifyService}
+     *  /alert/{topologyName}/spout
+     *                  /router
+     *                  /alert
+     *                  /publisher
+     */
+    private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout";
+    private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router";
+    private static final String ZK_ALERT_CONFIG_ALERT = "{0}/alert";
+    private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher";
+
+    private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+
+    private final static String METADATA_SERVICE_HOST = "metadataService.host";
+    private final static String METADATA_SERVICE_PORT = "metadataService.port";
+    private final static String METADATA_SERVICE_CONTEXT = "metadataService.context";
+    private final static String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
+    private final static String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
+
+    private volatile ScheduleState currentState = null;
+    private final ConfigBusProducer producer;
+    private final IMetadataServiceClient client;
+    private Config config;
+    
+    public Coordinator() {
+        config = ConfigFactory.load().getConfig(COORDINATOR);
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        producer = new ConfigBusProducer(zkConfig);
+        client = new MetadataServiceClientImpl(config);
+    }
+
+    public Coordinator(Config config, ConfigBusProducer producer, IMetadataServiceClient client) {
+        this.config = config;
+        this.producer = producer;
+        this.client = client;
+    }
+
+    public ScheduleState schedule(ScheduleOption option) {
+        IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+        TopologyMgmtService mgmtService = new TopologyMgmtService();
+        IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
+
+        scheduler.init(context, mgmtService);
+        ScheduleState state = scheduler.schedule(option);
+
+        // persist & notify
+        postSchedule(client, state, producer);
+
+        currentState = state;
+        return state;
+    }
+
+    public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) {
+        // persist state
+        client.addScheduleState(state);
+        // TODO, see ScheduleState comments on how to better store these configs
+        // store policy assignment
+        // store monitored stream
+
+        // notify
+        ConfigValue value = new ConfigValue();
+        value.setValue(state.getVersion());
+        value.setValueVersionId(true);
+        for (String topo : state.getSpoutSpecs().keySet()) {
+            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_SPOUT, topo), value);
+        }
+        for (String topo : state.getGroupSpecs().keySet()) {
+            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ROUTER, topo), value);
+        }
+        for (String topo : state.getAlertSpecs().keySet()) {
+            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ALERT, topo), value);
+        }
+        for (String topo : state.getPublishSpecs().keySet()) {
+            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_PUBLISHER, topo), value);
+        }
+    }
+
+    public ScheduleState getState() {
+        return currentState;
+    }
+
+    /**
+     * shutdown background threads and release various resources
+     */
+    private static class CoordinatorShutdownHook implements Runnable {
+        private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class);
+        private ScheduledExecutorService executorSrv;
+
+        public CoordinatorShutdownHook(ScheduledExecutorService executorSrv) {
+            this.executorSrv = executorSrv;
+        }
+
+        @Override
+        public void run() {
+            LOG.info("start shutdown coordinator ...");
+            LOG.info("Step 1 shutdown dynamic policy loader thread ");
+            // we should catch every exception to make best effort for clean
+            // shutdown
+            try {
+                executorSrv.shutdown();
+                executorSrv.awaitTermination(2000, TimeUnit.MILLISECONDS);
+            } catch (Throwable t) {
+                LOG.error("error shutdown dynamic policy loader", t);
+            } finally {
+                executorSrv.shutdownNow();
+            }
+        }
+    }
+
+    private static class PolicyChangeHandler implements PolicyChangeListener {
+        private final static Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
+        private Config config;
+        private IMetadataServiceClient client;
+
+        public PolicyChangeHandler(Config config) {
+            this.config = config;
+            this.client = new MetadataServiceClientImpl(config);
+        }
+
+        @Override
+        public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies,
+                Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
+            LOG.info("policy changed ... ");
+            LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: "
+                    + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
+
+            IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
+            TopologyMgmtService mgmtService = new TopologyMgmtService();
+            IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
+
+            scheduler.init(context, mgmtService);
+
+            ScheduleState state = scheduler.schedule(new ScheduleOption());
+
+            ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config));
+            postSchedule(client, state, producer);
+            producer.send("spout", new ConfigValue());
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        Config config = ConfigFactory.load().getConfig(COORDINATOR);
+        // build dynamic policy loader
+        String host = config.getString(METADATA_SERVICE_HOST);
+        int port = config.getInt(METADATA_SERVICE_PORT);
+        String context = config.getString(METADATA_SERVICE_CONTEXT);
+        IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
+        DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
+        loader.addPolicyChangeListener(new PolicyChangeHandler(config));
+
+        // schedule dynamic policy loader
+        long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
+        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
+        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1);
+        scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
+        Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
+        LOG.info("Eagle Coordinator started ...");
+        
+        Thread.currentThread().join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
new file mode 100644
index 0000000..3458b3e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+/**
+ * @since Apr 22, 2016
+ *
+ */
+public class CoordinatorConstants {
+    public static final String CONFIG_ITEM_COORDINATOR = "coordinator";
+    public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound";
+    public static final String CONFIG_ITEM_BOLT_LOAD_UPBOUND = "boltLoadUpbound";
+    public static final String POLICY_DEFAULT_PARALLELISM = "policyDefaultParallelism";
+    public static final String BOLT_PARALLELISM = "boltParallelism";
+    public static final String NUM_OF_ALERT_BOLTS_PER_TOPOLOGY = "numOfAlertBoltsPerTopology";
+    public static final String POLICIES_PER_BOLT = "policiesPerBolt";
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
new file mode 100644
index 0000000..5e61443
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+
+/**
+ * @since Mar 24, 2016
+ *
+ */
+public interface IPolicyScheduler {
+
+    void init(IScheduleContext context, TopologyMgmtService mgmtService);
+
+    /**
+     * Build the assignments for all.
+     */
+    ScheduleState schedule(ScheduleOption option);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
new file mode 100644
index 0000000..0cde22d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public interface IScheduleContext {
+
+    Map<String, Topology> getTopologies();
+
+    Map<String, PolicyDefinition> getPolicies();
+
+    // data source
+    Map<String, Kafka2TupleMetadata> getDataSourceMetadata();
+
+    Map<String, StreamDefinition> getStreamSchemas();
+
+    Map<String, TopologyUsage> getTopologyUsages();
+
+    Map<String, PolicyAssignment> getPolicyAssignments();
+
+    Map<StreamGroup, MonitoredStream> getMonitoredStreams();
+    
+    Map<String, Publishment> getPublishments();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
new file mode 100644
index 0000000..8bccb53
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
+
+/**
+ * @since Mar 24, 2016
+ *
+ */
+public class PolicySchedulerFactory {
+
+    public static IPolicyScheduler createScheduler() {
+        return new GreedyPolicyScheduler();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
new file mode 100644
index 0000000..6c04e61
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+/**
+ * A runtime option for one schedule processing.
+ * 
+ * Could used for configuration override.
+ * 
+ * @since Apr 19, 2016
+ *
+ */
+public class ScheduleOption {
+    private int policiesPerBolt;
+    private int boltParallelism;
+    private int policyDefaultParallelism;
+    private double boltLoadUpbound;
+    private double topoLoadUpbound;
+
+    public int getPoliciesPerBolt() {
+        return policiesPerBolt;
+    }
+
+    public void setPoliciesPerBolt(int policiesPerBolt) {
+        this.policiesPerBolt = policiesPerBolt;
+    }
+
+    public int getBoltParallelism() {
+        return boltParallelism;
+    }
+
+    public void setBoltParallelism(int boltParallelism) {
+        this.boltParallelism = boltParallelism;
+    }
+
+    public int getPolicyDefaultParallelism() {
+        return policyDefaultParallelism;
+    }
+
+    public void setPolicyDefaultParallelism(int policyDefaultParallelism) {
+        this.policyDefaultParallelism = policyDefaultParallelism;
+    }
+
+    public double getBoltLoadUpbound() {
+        return boltLoadUpbound;
+    }
+
+    public void setBoltLoadUpbound(double boltLoadUpbound) {
+        this.boltLoadUpbound = boltLoadUpbound;
+    }
+
+    public double getTopoLoadUpbound() {
+        return topoLoadUpbound;
+    }
+
+    public void setTopoLoadUpbound(double topoLoadUpbound) {
+        this.topoLoadUpbound = topoLoadUpbound;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
new file mode 100644
index 0000000..4ae07f5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator;
+
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since Mar 29, 2016
+ *
+ */
+public class TopologyMgmtService {
+
+    public static class TopologyMeta {
+        public String topologyId;
+        public Topology topology;
+        public TopologyUsage usage;
+
+        public String clusterId;
+        public String nimbusHost;
+        public String nimbusPort;
+
+    }
+
+    public static class StormClusterMeta {
+        public String clusterId;
+        public String nimbusHost;
+        public String nimbusPort;
+        public String stormVersion;
+    }
+
+    @SuppressWarnings("unused")
+    private int boltParallelism = 0;
+    private int numberOfBoltsPerTopology = 0;
+
+    public TopologyMgmtService() {
+        Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
+        boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
+        numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
+    }
+    
+    public int getNumberOfAlertBoltsInTopology() {
+        return numberOfBoltsPerTopology;
+    }
+
+    /**
+     * TODO: call topology mgmt API to create a topology
+     * 
+     * @return
+     */
+    public TopologyMeta creatTopology() {
+        // TODO
+        throw new UnsupportedOperationException("not supported yet!");
+    }
+    
+    public List<TopologyMeta> listTopologies() {
+        // TODO
+        return Collections.emptyList();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
new file mode 100644
index 0000000..a9b6c00
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_BOLT_LOAD_UPBOUND;
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICIES_PER_BOLT;
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICY_DEFAULT_PARALLELISM;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IPolicyScheduler;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * A simple greedy assigner. <br/>
+ * A greedy assigner simply loop the policies, find the most suitable topology
+ * to locate the policy first, then assign the topics to corresponding
+ * spouts/group-by bolts.
+ * 
+ * <br/>
+ * For each given policy, the greedy steps are
+ * <ul>
+ * <li>1. Find the same topology that already serve the policy without exceed the load</li>
+ * <li>2. Find the topology that already take the source traffic without exceed the load</li>
+ * <li>3. Find the topology that available to place source topic without exceed the load</li>
+ * <li>4. Create a new topology and locate the policy</li>
+ * <li>Route table generated after all policies assigned</li>
+ * <ul>
+ * <br/>
+ * 
+ * @since Mar 24, 2016
+ *
+ */
+public class GreedyPolicyScheduler implements IPolicyScheduler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GreedyPolicyScheduler.class);
+
+    private int policiesPerBolt;
+    private int policyDefaultParallelism;
+    private int initialQueueSize;
+    private double boltLoadUpbound;
+
+    // copied context for scheduling
+    private IScheduleContext context;
+
+    private TopologyMgmtService mgmtService;
+
+    private ScheduleState state;
+
+    public GreedyPolicyScheduler() {
+        Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
+        policiesPerBolt = config.getInt(POLICIES_PER_BOLT);
+        policyDefaultParallelism = config.getInt(POLICY_DEFAULT_PARALLELISM);
+        initialQueueSize = policyDefaultParallelism;
+        boltLoadUpbound = config.getDouble(CONFIG_ITEM_BOLT_LOAD_UPBOUND);
+    }
+
+    public synchronized ScheduleState schedule(ScheduleOption option) {
+        // FIXME: never re-assign right now: sticky mode
+        // TODO: how to identify the over-heat nodes? not yet done #Scale of policies
+        // Answer: Use configured policiesPerBolt and configured bolt load up-bound
+        // FIXME: Here could be strategy to define the topology priorities
+        List<WorkItem> workSets = findWorkingSets();
+        /**
+         * <pre>
+         * <ul>
+         * <li>how to support take multiple "dumped" topology that consuming the same input as one available sets?</li>
+         * Answer: spout spec generated after policy assigned
+         * <li>How to define the input traffic partition?</li>
+         * Answer: input traffic might not be supported right now.
+         * <li>How to support traffic partition between topology?</li> 
+         * Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert
+         * <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li>
+         * Answer: Use configured topology load up-bound, when topology load is available, will adopt
+         * <ul>
+         * <pre>
+         */
+        List<ScheduleResult> results = new ArrayList<ScheduleResult>();
+        Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>();
+        for (WorkItem item : workSets) {
+            ScheduleResult r = schedulePolicy(item, newAssignments);
+            results.add(r);
+        }
+
+        state = generateMonitorMetadata(workSets, newAssignments);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("calculated schedule state: {}", JsonUtils.writeValueAsString(state));
+        }
+        return state;
+    }
+
+    private List<WorkItem> findWorkingSets() {
+        // find the unassigned definition
+        List<WorkItem> workSets = new LinkedList<WorkItem>();
+        for (PolicyDefinition def : context.getPolicies().values()) {
+            int expectParal = def.getParallelismHint();
+            if (expectParal == 0) {
+                expectParal = policyDefaultParallelism;
+            }
+            // how to handle expand of an policy in a smooth transition manner
+            PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName());
+            if (assignment != null) {
+                LOG.info("policy {} already allocated", def.getName());
+                continue;
+            }
+
+            WorkItem item = new WorkItem(def, expectParal);
+            workSets.add(item);
+        }
+        LOG.info("work set calculation: {}", workSets);
+        return workSets;
+    }
+
+    private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets,
+            Map<String, PolicyAssignment> newAssignments) {
+        MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context);
+        return generator.generate(expandworkSets);
+    }
+
+    private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology,
+            TopologyUsage usage) {
+        String policyName = def.getName();
+
+        // topology usage update
+        alertBoltUsage.addPolicies(def);
+
+        // update alert policy
+        usage.getPolicies().add(policyName);
+
+        // update source topics
+        updateDataSource(usage, def);
+        
+        // update group-by
+        updateGrouping(usage, def);
+    }
+
+    private void updateGrouping(TopologyUsage usage, PolicyDefinition def) {
+        // groupByMeta is removed since groupspec generate doesnt need it now. 
+//        List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
+//        Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
+//        for (StreamPartition par : policyPartitionSpec) {
+//            List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
+//            if (partitions == null) {
+//                partitions = new ArrayList<StreamPartition>();
+//                // de-dup of the partition on the list?
+//                groupByMeta.put(par.getStreamId(), partitions);
+//            }
+//            if (!partitions.contains(par)) {
+//                partitions.add(par);
+//            }
+//        }
+    }
+
+    private void updateDataSource(TopologyUsage usage, PolicyDefinition def) {
+        List<String> datasources = findDatasource(def);
+        usage.getDataSources().addAll(datasources);
+    }
+
+    private List<String> findDatasource(PolicyDefinition def) {
+        List<String> result = new ArrayList<String>();
+
+        List<String> inputStreams = def.getInputStreams();
+        Map<String, StreamDefinition> schemaMaps = context.getStreamSchemas();
+        for (String is : inputStreams) {
+            StreamDefinition ss = schemaMaps.get(is);
+            result.add(ss.getDataSource());
+        }
+        return result;
+    }
+
+    /**
+     * For each given policy, the greedy steps are
+     * <ul>
+     * <li>1. Find the same topology that already serve the policy</li>
+     * <li>2. Find the topology that already take the source traffic</li>
+     * <li>3. Find the topology that available to place source topic</li>
+     * <li>4. Create a new topology and locate the policy</li>
+     * <li>Route table generated after all policies assigned</li>
+     * <ul>
+     * <br/>
+     * 
+     * @param newAssignments
+     */
+    private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) {
+        LOG.info(" schedule for {}", item );
+
+        String policyName = item.def.getName();
+        StreamGroup policyStreamPartition = new StreamGroup();
+        if (item.def.getPartitionSpec().isEmpty()) {
+            LOG.error(" policy {} partition spec is empty! ", policyName);
+            ScheduleResult result = new ScheduleResult();
+            result.policyName = policyName;
+            result.code = 400;
+            result.message = "policy doesn't have partition spec";
+            return result;
+        }
+        policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec());
+
+        MonitoredStream targetdStream = context.getMonitoredStreams().get(policyStreamPartition);
+        if (targetdStream == null) {
+            targetdStream = new MonitoredStream(policyStreamPartition);
+            context.getMonitoredStreams().put(policyStreamPartition, targetdStream);
+        }
+
+        ScheduleResult result = new ScheduleResult();
+        result.policyName = policyName;
+
+        StreamWorkSlotQueue queue = findWorkSlotQueue(targetdStream, item.def);
+        if (queue == null) {
+            result.code = 400;
+            result.message = String.format("unable to allocate work queue resource for policy %s !", policyName);
+        } else {
+            placePolicyToQueue(item.def, queue, newAssignments);
+            result.code = 200;
+            result.message = "OK";
+        }
+
+        LOG.info(" schedule result : {}", result);
+        return result;
+    }
+
+    private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue,
+            Map<String, PolicyAssignment> newAssignments) {
+        for (WorkSlot slot : queue.getWorkingSlots()) {
+            Topology targetTopology = context.getTopologies().get(slot.getTopologyName());
+            TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName());
+            AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId());
+            placePolicy(def, alertBoltUsage, targetTopology, usage);
+        }
+//        queue.placePolicy(def);
+        PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId());
+        context.getPolicyAssignments().put(def.getName(), assignment);
+        newAssignments.put(def.getName(), assignment);
+    }
+
+    private StreamWorkSlotQueue findWorkSlotQueue(MonitoredStream targetdStream, PolicyDefinition def) {
+        StreamWorkSlotQueue targetQueue = null;
+        for (StreamWorkSlotQueue queue : targetdStream.getQueues()) {
+            if (isQueueAvailable(queue, def)) {
+                targetQueue = queue;
+                break;
+            }
+        }
+
+        if (targetQueue == null) {
+            WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
+            // TODO : get the properties from policy definiton
+            targetQueue = builder.createQueue(targetdStream, false, getQueueSize(def.getParallelismHint()),
+                    new HashMap<String, Object>());
+        }
+        return targetQueue;
+    }
+
+    /**
+     * Some strategy to generate correct size in Startegy of queue builder
+     * 
+     * @param hint
+     * @return
+     */
+    private int getQueueSize(int hint) {
+        return initialQueueSize;
+    }
+
+    private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) {
+        if (queue.getQueueSize() < def.getParallelismHint()) {
+            return false;
+        }
+
+        for (WorkSlot slot : queue.getWorkingSlots()) {
+            TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
+            AlertBoltUsage usage = u.getAlertBoltUsage(slot.getBoltId());
+            if (!isBoltAvailable(usage, def)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
+        // overload or over policy # or already contains
+        if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
+                || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
+            return false;
+        }
+        return true;
+    }
+
+    public void init(IScheduleContext context, TopologyMgmtService mgmtService) {
+        this.context = new InMemScheduleConext(context);
+        this.mgmtService = mgmtService;
+    }
+    
+    public IScheduleContext getContext() {
+        return context;
+    }
+
+    public ScheduleState getState() {
+        return state;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
new file mode 100644
index 0000000..40f16e9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Apr 26, 2016
+ * Given current policy placement, figure out monitor metadata
+ * 
+ * TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
+ * FIXME: too many duplicated code logic : check null; add list to map; add to list.. 
+ */
+public class MonitorMetadataGenerator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MonitorMetadataGenerator.class);
+
+    private IScheduleContext context;
+
+    public MonitorMetadataGenerator(IScheduleContext context) {
+        this.context = context;
+    }
+
+    public ScheduleState generate(List<WorkItem> expandworkSets) {
+        // topologyId -> SpoutSpec
+        Map<String, SpoutSpec> topoSpoutSpecsMap = generateSpoutMonitorMetadata();
+
+        // grp-by meta spec(sort & grp)
+        Map<String, RouterSpec> groupSpecsMap = generateGroupbyMonitorMetadata();
+
+        // alert bolt spec
+        Map<String, AlertBoltSpec> alertSpecsMap = generateAlertMonitorMetadata();
+
+        Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
+
+        String uniqueVersion = generateVersion();
+        ScheduleState status = new ScheduleState(uniqueVersion, 
+                topoSpoutSpecsMap, 
+                groupSpecsMap, 
+                alertSpecsMap,
+                publishSpecsMap, 
+                context.getPolicyAssignments().values(), 
+                context.getMonitoredStreams().values(),
+                context.getPolicies().values(),
+                context.getStreamSchemas().values());
+        return status;
+    }
+
+    private Map<String, PublishSpec> generatePublishMetadata() {
+        Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>();
+        // prebuild policy to publishment map
+        Map<String, List<Publishment>> policyToPub = new HashMap<String, List<Publishment>>();
+        for (Publishment pub : context.getPublishments().values()) {
+            for (String policyId : pub.getPolicyIds()) {
+                List<Publishment> policyPubs = policyToPub.get(policyId);
+                if (policyPubs == null) {
+                    policyPubs = new ArrayList<>();
+                    policyToPub.put(policyId, policyPubs);
+                }
+                policyPubs.add(pub);
+            }
+        }
+
+        // build per topology
+        for (TopologyUsage u : context.getTopologyUsages().values()) {
+            PublishSpec pubSpec = pubSpecs.get(u.getTopoName());
+            if (pubSpec == null) {
+                pubSpec = new PublishSpec(u.getTopoName(), context.getTopologies().get(u.getTopoName()).getPubBoltId());
+                pubSpecs.put(u.getTopoName(), pubSpec);
+            }
+
+            for (String p : u.getPolicies()) {
+                PolicyDefinition definition = context.getPolicies().get(p);
+                if (definition == null) {
+                    continue;
+                }
+                if (policyToPub.containsKey(p)) {
+                    for (Publishment pub : policyToPub.get(p)) {
+                        pubSpec.addPublishment(pub);
+                    }
+                }
+            }
+        }
+        return pubSpecs;
+    }
+
+    /**
+     * FIXME: add auto-increment version number?
+     */
+    private String generateVersion() {
+        return "spec_version_" + System.currentTimeMillis();
+    }
+
+    private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() {
+        Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>();
+        for (TopologyUsage u : context.getTopologyUsages().values()) {
+            AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName());
+            if (alertSpec == null) {
+                alertSpec = new AlertBoltSpec(u.getTopoName());
+                alertSpecs.put(u.getTopoName(), alertSpec);
+            }
+            for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) {
+                for (String policyName : boltUsage.getPolicies()) {
+                    PolicyDefinition definition = context.getPolicies().get(policyName);
+                    alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName());
+                }
+            }
+        }
+        return alertSpecs;
+    }
+
+    private Map<String, RouterSpec> generateGroupbyMonitorMetadata() {
+        Map<String, RouterSpec> groupSpecsMap = new HashMap<String, RouterSpec>();
+        for (TopologyUsage u : context.getTopologyUsages().values()) {
+            RouterSpec spec = groupSpecsMap.get(u.getTopoName());
+            if (spec == null) {
+                spec = new RouterSpec(u.getTopoName());
+                groupSpecsMap.put(u.getTopoName(), spec);
+            }
+            
+            for (MonitoredStream ms : u.getMonitoredStream()) {
+                // mutiple stream on the same policy group : for correlation group case:
+                for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) {
+                    StreamRouterSpec routeSpec = new StreamRouterSpec();
+                    routeSpec.setPartition(partiton);
+                    routeSpec.setStreamId(partiton.getStreamId());
+
+                    for (StreamWorkSlotQueue sq : ms.getQueues()) {
+                        PolicyWorkerQueue queue = new PolicyWorkerQueue();
+                        queue.setWorkers(sq.getWorkingSlots());
+                        queue.setPartition(partiton);
+                        routeSpec.addQueue(queue);
+                    }
+
+                    spec.addRouterSpec(routeSpec);
+                }
+            }
+        }
+
+        return groupSpecsMap;
+    }
+
+    private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
+        Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
+        
+        Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>();
+        // streamName -> StreamDefinition
+        Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas();
+        Map<String, Kafka2TupleMetadata> datasourcesMap = context.getDataSourceMetadata();
+        for (TopologyUsage usage : context.getTopologyUsages().values()) {
+            Topology topo = context.getTopologies().get(usage.getTopoName());
+
+            // based on data source schemas
+            // generate topic -> Kafka2TupleMetadata
+            // generate topic -> Tuple2StreamMetadata (actually the schema selector)
+            Map<String, Kafka2TupleMetadata> dss = new HashMap<String, Kafka2TupleMetadata>();
+            Map<String, Tuple2StreamMetadata> tss = new HashMap<String, Tuple2StreamMetadata>();
+            for (String dataSourceId : usage.getDataSources()) {
+                Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId);
+                dss.put(ds.getTopic(), ds);
+                tss.put(ds.getTopic(), ds.getCodec());
+            }
+
+            // generate topicId -> StreamRepartitionMetadata
+            Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>();
+            for (String policyName : usage.getPolicies()) {
+                PolicyDefinition def = context.getPolicies().get(policyName);
+                
+                PolicyAssignment assignment = context.getPolicyAssignments().get(policyName);
+                if (assignment == null) {
+                    LOG.error(" can not find assignment for policy {} ! ", policyName);
+                    continue;
+                }
+
+                for (StreamPartition policyStreamPartition : def.getPartitionSpec()) {
+                    String stream = policyStreamPartition.getStreamId();
+                    StreamDefinition schema = streamSchemaMap.get(stream);
+                    String topic = datasourcesMap.get(schema.getDataSource()).getTopic();
+
+                    // add stream name to tuple metadata
+                    if (tss.containsKey(topic)) {
+                        Tuple2StreamMetadata tupleMetadata = tss.get(topic);
+                        tupleMetadata.getActiveStreamNames().add(stream);
+                    }
+
+                    // grouping strategy
+                    StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
+                    gs.partition = policyStreamPartition;
+                    gs.numTotalParticipatingRouterBolts = queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts();
+                    gs.startSequence = queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName());
+                    gs.totalTargetBoltIds = new ArrayList<String>(topo.getGroupNodeIds());
+
+                    // add to map
+                    addGroupingStrategy(streamsMap, stream, schema, topic, schema.getDataSource(), gs);
+                }
+            }
+
+            SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, tss, dss);
+            topoSpoutSpecsMap.put(topo.getName(), spoutSpec);
+        }
+        return topoSpoutSpecsMap;
+    }
+
+    /**
+     * Work queue not a root level object, thus we need to build a map from
+     * MonitoredStream for later quick lookup
+     * 
+     * @return
+     */
+    private Map<String, StreamWorkSlotQueue> buildQueueMap() {
+        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
+        for (MonitoredStream ms : context.getMonitoredStreams().values()) {
+            for (StreamWorkSlotQueue queue : ms.getQueues()) {
+                queueMap.put(queue.getQueueId(), queue);
+            }
+        }
+        return queueMap;
+    }
+
+    private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream,
+            StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
+        List<StreamRepartitionMetadata> dsStreamMeta;
+        if (streamsMap.containsKey(topicName)) {
+            dsStreamMeta = streamsMap.get(topicName);
+        } else {
+            dsStreamMeta = new ArrayList<StreamRepartitionMetadata>();
+            streamsMap.put(topicName, dsStreamMeta);
+        }
+        StreamRepartitionMetadata targetSm = null;
+        for (StreamRepartitionMetadata sm : dsStreamMeta) {
+            if (stream.equalsIgnoreCase(sm.getStreamId())) {
+                targetSm = sm;
+                break;
+            }
+        }
+        if (targetSm == null) {
+            targetSm = new StreamRepartitionMetadata(datasourceName, schema.getStreamId());
+            dsStreamMeta.add(targetSm);
+        }
+        if (!targetSm.groupingStrategies.contains(gs)) {
+            targetSm.addGroupStrategy(gs);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
new file mode 100644
index 0000000..ea96d79
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+/**
+ * Schedule result for one policy
+ * 
+ * 
+ * @since Apr 26, 2016
+ *
+ */
+public class ScheduleResult {
+    int code;
+    String message;
+    String policyName;
+    StreamPartition partition;
+    int index;
+    List<PolicyAssignment> topoliciesScheduled;
+
+    public String toString() {
+        return String.format("policy: %s, result code: %d ", policyName, code, message);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
new file mode 100644
index 0000000..baa489d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+public class WorkItem {
+    public final PolicyDefinition def;
+    public final int requestParallelism;
+
+    public WorkItem(PolicyDefinition def, int workNum) {
+        this.def = def;
+        this.requestParallelism = workNum;
+    }
+
+    public String toString() {
+        return "policy name: " + def.getName() + "(" + requestParallelism + ")";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
new file mode 100644
index 0000000..a32b8fb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy;
+import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Apr 27, 2016
+ *
+ */
+public class WorkQueueBuilder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WorkQueueBuilder.class);
+
+    private final IScheduleContext context;
+    private final TopologyMgmtService mgmtService;
+
+    public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService mgmtService) {
+        this.context = context;
+        this.mgmtService = mgmtService;
+    }
+
+    public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size,
+            Map<String, Object> properties) {
+        // FIXME: make extensible and configurable
+        IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
+        List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
+        if (slots.size() < size) {
+            LOG.error("allocat stream work queue failed, required size");
+            return null;
+        }
+        StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
+                slots);
+        calculateGroupIndexAndCount(queue);
+        assignQueueSlots(stream, queue);// build reverse reference
+        stream.addQueues(queue);
+
+        return queue;
+    }
+
+    private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue queue) {
+        for (WorkSlot slot : queue.getWorkingSlots()) {
+            TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
+            AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId());
+            boltUsage.addQueue(stream.getStreamGroup(), queue);
+            u.addMonitoredStream(stream);
+        }
+    }
+
+    private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) {
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        int total = 0;
+        for (WorkSlot slot : queue.getWorkingSlots()) {
+            if (result.containsKey(slot.getTopologyName())) {
+                continue;
+            }
+            result.put(slot.getTopologyName(), total);
+            total += context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt();
+        }
+
+        queue.setNumberOfGroupBolts(total);
+        queue.setTopoGroupStartIndex(result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
new file mode 100644
index 0000000..28df3c4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl.strategies;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+
+/**
+ * @since Apr 27, 2016
+ *
+ */
+public interface IWorkSlotStrategy {
+
+    List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties);
+
+}



Mime
View raw message