eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [13/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:13 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
new file mode 100644
index 0000000..f90decc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.alert.coordinator.mock.TestTopologyMgmtService;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since Apr 22, 2016
+ *
+ */
+public class SchedulerTest {
+
+    private static final String STREAM2 = "stream2";
+    private static final String JOIN_POLICY_1 = "join-policy-1";
+    private static final String TEST_TOPIC = "test-topic";
+    private static final String TEST_POLICY_1 = "test-policy1";
+    private static final String TEST_POLICY_2 = "test-policy2";
+    private static final String TEST_POLICY_3 = "test-policy3";
+    private static final String STREAM1 = "stream1";
+    private static final String DS_NAME = "ds1";
+    private static ObjectMapper mapper = new ObjectMapper();
+    private static final Logger LOG = LoggerFactory.getLogger(SchedulerTest.class);
+
+    @BeforeClass
+    public static void setup() {
+        ConfigFactory.invalidateCaches();
+        System.setProperty("config.resource", "/application.conf");
+    }
+    
+    @Test
+    public void test01_simple() throws Exception {
+        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
+        IScheduleContext context = createScheduleContext();
+        ps.init(context, createMgmtService());
+        ps.schedule(new ScheduleOption());
+
+        ScheduleState status = ps.getState();
+        context = ps.getContext(); // context updated!
+        Map<String, SpoutSpec> spec = status.getSpoutSpecs();
+
+        LOG.info(mapper.writeValueAsString(spec));
+        Assert.assertEquals(2, spec.size());
+        Assert.assertTrue(spec.containsKey("topo1"));
+        assertFirstPolicyScheduled(context, status);
+    }
+
+    private void assertFirstPolicyScheduled(IScheduleContext context, ScheduleState status) {
+        String version = status.getVersion();
+        // assert spout spec
+        {
+            Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator();
+            {
+                // assert spout 1
+                SpoutSpec ss = it.next();
+                Assert.assertEquals(version, ss.getVersion());
+                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
+                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
+
+                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
+                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
+                Assert.assertEquals(1, metas.size());
+
+                StreamRepartitionMetadata streamMeta = metas.iterator().next();
+                Assert.assertEquals(STREAM1, streamMeta.getStreamId());
+                Assert.assertEquals(DS_NAME, streamMeta.getTopicName());
+                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
+
+                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
+                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(0, gs.startSequence);
+
+                Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+            }
+            {
+                // assert spout 2
+                SpoutSpec ss = it.next();
+                Assert.assertEquals(version, ss.getVersion());
+                Assert.assertEquals(0, ss.getKafka2TupleMetadataMap().size());
+            }
+        }
+        // assert grp-by spec
+        {
+            Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator();
+            {
+                Assert.assertEquals(2, status.getGroupSpecs().values().size());
+                Assert.assertTrue(gsit.hasNext());
+                RouterSpec gspec = gsit.next();
+                Assert.assertEquals(version, gspec.getVersion());
+                String topo1 = gspec.getTopologyName();
+                LOG.info("group spec topology name:", topo1);
+                List<StreamRouterSpec> routeSpecs = gspec.getRouterSpecs();
+                Assert.assertEquals(1, routeSpecs.size());
+                for (StreamRouterSpec spec : routeSpecs) {
+                    StreamPartition par = spec.getPartition();
+                    Assert.assertEquals(STREAM1, par.getStreamId());
+                    Assert.assertEquals(Arrays.asList("col1"), par.getColumns());
+                    Assert.assertEquals(STREAM1, spec.getStreamId());
+
+                    Assert.assertEquals(1, spec.getTargetQueue().size());
+                    List<PolicyWorkerQueue> queues = spec.getTargetQueue();
+                    Assert.assertEquals(1, queues.size());
+                    Assert.assertEquals(5, queues.get(0).getWorkers().size());
+                    for (WorkSlot slot : queues.get(0).getWorkers()) {
+                        Assert.assertEquals(topo1, slot.getTopologyName());
+                        LOG.info(slot.getBoltId());
+                    }
+                }
+            }
+            // grp-spec2
+            {
+                RouterSpec gs2 = gsit.next();
+                Assert.assertEquals(version, gs2.getVersion());
+                List<StreamRouterSpec> routeSpecs = gs2.getRouterSpecs();
+                Assert.assertEquals(0, routeSpecs.size());
+            }
+        }
+        // alert spec
+        {
+            Assert.assertEquals(2, status.getAlertSpecs().values().size());
+            Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator();
+            // topo1
+            {
+                AlertBoltSpec alertSpec = asit.next();
+                Assert.assertEquals(version, alertSpec.getVersion());
+                String topo1 = alertSpec.getTopologyName();
+                LOG.info("alert spec topology name {}", topo1);
+                for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) {
+                    Assert.assertEquals(1, definitions.size());
+                    Assert.assertEquals(TEST_POLICY_1, definitions.get(0));
+                }
+            }
+            // topo2
+            {
+                AlertBoltSpec alertSpec = asit.next();
+                Assert.assertEquals(version, alertSpec.getVersion());
+                String topo1 = alertSpec.getTopologyName();
+                LOG.info("alert spec topology name {}", topo1);
+                Assert.assertEquals(0, alertSpec.getBoltPolicyIdsMap().size());
+            }
+        }
+    }
+
+    private TopologyMgmtService createMgmtService() {
+        TestTopologyMgmtService.BOLT_NUMBER = 5;
+        TopologyMgmtService mgmtService = new TestTopologyMgmtService();
+        return mgmtService;
+    }
+
+    private InMemScheduleConext createScheduleContext() {
+        InMemScheduleConext context = new InMemScheduleConext();
+        // topo
+        Pair<Topology, TopologyUsage> pair1 = TestTopologyMgmtService.createEmptyTopology("topo1");
+        Pair<Topology, TopologyUsage> pair2 = TestTopologyMgmtService.createEmptyTopology("topo2");
+        context.addTopology(pair1.getLeft());
+        context.addTopologyUsages(pair1.getRight());
+        context.addTopology(pair2.getLeft());
+        context.addTopologyUsages(pair2.getRight());
+
+        // policy
+        createSamplePolicy(context, TEST_POLICY_1, STREAM1);
+
+        // data source
+        Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
+        ds.setName(DS_NAME);
+        ds.setTopic(TEST_TOPIC);
+        ds.setCodec(new Tuple2StreamMetadata());
+        context.addDataSource(ds);
+
+        // schema
+        {
+            StreamDefinition schema = new StreamDefinition();
+            {
+                StreamColumn c = new StreamColumn();
+                c.setName("col1");
+                c.setType(Type.STRING);
+                c.setDefaultValue("dflt");
+                schema.getColumns().add(c);
+            }
+            {
+                StreamColumn c = new StreamColumn();
+                c.setName("col2");
+                c.setType(Type.DOUBLE);
+                c.setDefaultValue("0.0");
+                schema.getColumns().add(c);
+            }
+            schema.setStreamId(STREAM1);
+            schema.setValidate(false);
+            schema.setDataSource(DS_NAME);
+            context.addSchema(schema);
+        }
+        {
+            StreamDefinition schema = new StreamDefinition();
+            {
+                StreamColumn c = new StreamColumn();
+                c.setName("col1");
+                c.setType(Type.STRING);
+                c.setDefaultValue("dflt");
+                schema.getColumns().add(c);
+            }
+            schema.setStreamId(STREAM2);
+            schema.setValidate(false);
+            schema.setDataSource(DS_NAME);
+            context.addSchema(schema);
+        }
+
+        return context;
+    }
+
+    /**
+     * Add policy after add policy
+     */
+    @Test
+    public void test_schedule_add2() {
+        IScheduleContext context = createScheduleContext();
+        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
+        TopologyMgmtService mgmtService = new TopologyMgmtService();
+        ps.init(context, mgmtService);
+
+        ScheduleOption option = new ScheduleOption();
+        ps.schedule(option);
+        ScheduleState status = ps.getState();
+        context = ps.getContext(); // context updated!
+        assertFirstPolicyScheduled(context, status);
+
+        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1);
+
+        ps.init(context, mgmtService); // reinit
+        ps.schedule(option);
+        status = ps.getState();
+        context = ps.getContext(); // context updated!
+        // now assert two policy on the same queue
+        assertSecondPolicyCreated(context, status);
+
+        // add one policy on different stream of the same topic
+        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2);
+
+        ps.init(context, mgmtService); // re-init
+        ps.schedule(option);
+        status = ps.getState();
+        context = ps.getContext(); // context updated!
+        assertThridPolicyScheduled(context, status);
+    }
+
+    private void assertThridPolicyScheduled(IScheduleContext context, ScheduleState status) {
+        {
+            // now assert two policy on the same queue
+            Assert.assertEquals(2, status.getSpoutSpecs().values().size());
+            Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator();
+            {
+                // assert spout 1
+                SpoutSpec ss = it.next();
+                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
+                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
+
+                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
+                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
+                Assert.assertEquals(1, metas.size());
+
+                StreamRepartitionMetadata streamMeta = metas.iterator().next();
+                Assert.assertEquals(STREAM1, streamMeta.getStreamId());
+                Assert.assertEquals(DS_NAME, streamMeta.getTopicName());
+                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
+
+                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
+                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(0, gs.startSequence);
+
+                PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
+                PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2);
+                Assert.assertNotNull(pa1);
+                Assert.assertNotNull(pa2);
+                Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId());
+            }
+            {
+                // assert spout 2
+                SpoutSpec ss = it.next();
+                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
+
+                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
+
+                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
+                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
+                Assert.assertEquals(1, metas.size());
+
+                StreamRepartitionMetadata streamMeta = metas.iterator().next();
+                Assert.assertEquals(STREAM2, streamMeta.getStreamId());
+                Assert.assertEquals(DS_NAME, streamMeta.getTopicName());
+                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
+
+                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
+                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(0, gs.startSequence);
+
+                // assert policy assignment for the three policies
+                PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
+                PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2);
+                PolicyAssignment pa3 = context.getPolicyAssignments().get(TEST_POLICY_3);
+                Assert.assertNotNull(pa1);
+                Assert.assertNotNull(pa2);
+                Assert.assertNotNull(pa3);
+                Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId());
+                Assert.assertNotEquals(pa1.getQueueId(), pa3.getQueueId());
+                StreamWorkSlotQueue queue1 = getQueue(context, pa1.getQueueId()).getRight();
+                StreamWorkSlotQueue queue3 = getQueue(context, pa3.getQueueId()).getRight();
+                Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(), queue3.getWorkingSlots().get(0).getTopologyName());
+            }
+        }
+        // group spec
+        {
+            Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator();
+            Assert.assertEquals(2, status.getGroupSpecs().values().size());
+            {
+                // first topology's grp - spec
+                gsit.next();
+                // should be same with second policy scheduled, not assert here
+            }
+            {
+                // second topology's grp - spec
+                RouterSpec spec = gsit.next();
+                Assert.assertEquals(1, spec.getRouterSpecs().size());
+                StreamRouterSpec routeSpec = spec.getRouterSpecs().get(0);
+                Assert.assertEquals(STREAM2, routeSpec.getStreamId());
+                Assert.assertEquals(Arrays.asList("col1"), routeSpec.getPartition().getColumns());
+            }
+        }
+        // alert spec
+        {
+            Assert.assertEquals(2, status.getAlertSpecs().values().size());
+            Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator();
+            {
+                // same to the two policy case, not assert here
+                asit.next();
+            }
+            {
+                // seconds topology's alert spec
+                AlertBoltSpec as = asit.next();
+                Assert.assertEquals(5, as.getBoltPolicyIdsMap().size());
+                for (List<String> pdList : as.getBoltPolicyIdsMap().values()) {
+                    Assert.assertEquals(1, pdList.size());
+                    Assert.assertEquals(TEST_POLICY_3, pdList.get(0));
+                }
+            }
+        }
+    }
+
+    private void assertSecondPolicyCreated(IScheduleContext context, ScheduleState status) {
+        String version = status.getVersion();
+        {
+            // spout : assert two policy on the same topology (same worker
+            // queue)
+            Iterator<SpoutSpec> it = status.getSpoutSpecs().values().iterator();
+            {
+                // assert spout 1 has two policy
+                SpoutSpec ss = it.next();
+                Assert.assertEquals(1, ss.getKafka2TupleMetadataMap().size());
+                Assert.assertEquals(TEST_TOPIC, ss.getKafka2TupleMetadataMap().keySet().iterator().next());
+
+                Assert.assertEquals(1, ss.getStreamRepartitionMetadataMap().size());
+                List<StreamRepartitionMetadata> metas = ss.getStreamRepartitionMetadataMap().values().iterator().next();
+                Assert.assertEquals(1, metas.size());
+
+                StreamRepartitionMetadata streamMeta = metas.iterator().next();
+                Assert.assertEquals(STREAM1, streamMeta.getStreamId());
+                Assert.assertEquals(DS_NAME, streamMeta.getTopicName());
+                Assert.assertEquals(1, streamMeta.groupingStrategies.size());
+
+                StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
+                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(0, gs.startSequence);
+
+                // assert two policy on the same queue
+                PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
+                PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_2);
+                Assert.assertNotNull(pa1);
+                Assert.assertNotNull(pa2);
+                Assert.assertEquals(pa1.getQueueId(), pa2.getQueueId());
+                StreamWorkSlotQueue queue = getQueue(context, pa1.getQueueId()).getRight();
+                Assert.assertNotNull(queue);
+            }
+            {
+                // assert spout 2 is still empty
+                SpoutSpec ss = it.next();
+                Assert.assertEquals(0, ss.getKafka2TupleMetadataMap().size());
+            }
+        }
+
+        // assert grp-by spec. This is nothing different compare to first policy
+        {
+            Iterator<RouterSpec> gsit = status.getGroupSpecs().values().iterator();
+            {
+                Assert.assertEquals(2, status.getGroupSpecs().values().size());
+                Assert.assertTrue(gsit.hasNext());
+                RouterSpec gspec = gsit.next();
+                Assert.assertEquals(version, gspec.getVersion());
+                String topo1 = gspec.getTopologyName();
+                LOG.info("group spec topology name:", topo1);
+                List<StreamRouterSpec> routeSpecs = gspec.getRouterSpecs();
+                Assert.assertEquals(1, routeSpecs.size());
+                for (StreamRouterSpec spec : routeSpecs) {
+                    StreamPartition par = spec.getPartition();
+                    Assert.assertEquals(STREAM1, par.getStreamId());
+                    Assert.assertEquals(Arrays.asList("col1"), par.getColumns());
+                    Assert.assertEquals(STREAM1, spec.getStreamId());
+
+                    Assert.assertEquals(1, spec.getTargetQueue().size());
+                    List<PolicyWorkerQueue> queues = spec.getTargetQueue();
+                    Assert.assertEquals(1, queues.size());
+                    Assert.assertEquals(5, queues.get(0).getWorkers().size());
+                    for (WorkSlot slot : queues.get(0).getWorkers()) {
+                        Assert.assertEquals(topo1, slot.getTopologyName());
+                        LOG.info(slot.getBoltId());
+                    }
+                }
+            }
+            // grp-spec for second topology is still empty
+            {
+                RouterSpec gs2 = gsit.next();
+                Assert.assertEquals(version, gs2.getVersion());
+                List<StreamRouterSpec> routeSpecs = gs2.getRouterSpecs();
+                Assert.assertEquals(0, routeSpecs.size());
+            }
+        }
+        // alert spec
+        {
+            Assert.assertEquals(2, status.getAlertSpecs().values().size());
+            Iterator<AlertBoltSpec> asit = status.getAlertSpecs().values().iterator();
+            {
+                AlertBoltSpec alertSpec = asit.next();
+                Assert.assertEquals(version, alertSpec.getVersion());
+                String topo1 = alertSpec.getTopologyName();
+                LOG.info("alert spec topology name {}", topo1);
+                for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) {
+                    Assert.assertEquals(2, definitions.size());
+//                    List<String> names = Arrays.asList(definitions.stream().map((t) -> t.getName()).toArray(String[]::new));
+                    Assert.assertTrue(definitions.contains(TEST_POLICY_1));
+                    Assert.assertTrue(definitions.contains(TEST_POLICY_2));
+                }
+            }
+            // second spout
+            {
+                AlertBoltSpec spec = asit.next();
+                Assert.assertEquals(0, spec.getBoltPolicyIdsMap().size());
+            }
+        }
+    }
+
+    public static Pair<MonitoredStream, StreamWorkSlotQueue> getQueue(IScheduleContext context, String queueId) {
+        for (MonitoredStream ms : context.getMonitoredStreams().values()) {
+            for (StreamWorkSlotQueue q : ms.getQueues()) {
+                if (q.getQueueId().equals(queueId)) {
+                    return Pair.of(ms, q);
+                }
+            }
+        }
+        return null;
+    }
+
+    @Test
+    public void testGroupEquals() {
+        StreamRepartitionStrategy gs1 = new StreamRepartitionStrategy();
+        StreamPartition sp = new StreamPartition();
+        sp.setColumns(Arrays.asList("col1"));
+        sp.setSortSpec(new StreamSortSpec());
+        sp.setStreamId("testStream");
+        sp.setType(StreamPartition.Type.GROUPBY);
+        gs1.partition = sp;
+
+        StreamRepartitionStrategy gs2 = new StreamRepartitionStrategy();
+        sp = new StreamPartition();
+        sp.setColumns(Arrays.asList("col1"));
+        sp.setSortSpec(new StreamSortSpec());
+        sp.setStreamId("testStream");
+        sp.setType(StreamPartition.Type.GROUPBY);
+        gs2.partition = sp;
+
+        Assert.assertTrue(gs1.equals(gs2));
+        List<StreamRepartitionStrategy> list = new ArrayList<StreamRepartitionStrategy>();
+        list.add(gs1);
+        Assert.assertTrue(list.contains(gs2));
+    }
+
+    private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream) {
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setParallelismHint(5);
+        Definition def = new Definition();
+        pd.setDefinition(def);
+        pd.setName(policyName);
+        pd.setInputStreams(Arrays.asList(stream));
+        pd.setOutputStreams(Arrays.asList("outputStream2"));
+        StreamPartition par = new StreamPartition();
+        par.setColumns(Arrays.asList("col1"));
+        par.setType(StreamPartition.Type.GLOBAL);
+        par.setStreamId(stream);
+        pd.setPartitionSpec(Arrays.asList(par));
+        context.addPoilcy(pd);
+    }
+
+    /**
+     * Add and remove
+     */
+    @Test
+    public void test_schedule2_remove() {
+        // TODO
+    }
+
+    @Test
+    public void test_schedule_updateParitition() {
+        // TODO
+    }
+
+    @Test
+    public void test_schedule_nogroupby() {
+        // TODO
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void test_schedule_multipleStream() throws Exception {
+        IScheduleContext context = createScheduleContext();
+        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
+        TopologyMgmtService mgmtService = new TopologyMgmtService();
+
+        createJoinPolicy((InMemScheduleConext) context, JOIN_POLICY_1, Arrays.asList(STREAM1, STREAM2));
+
+        ps.init(context, mgmtService);
+        ScheduleOption option = new ScheduleOption();
+        ps.schedule(option);
+        ScheduleState state = ps.getState();
+
+        context = ps.getContext(); // context updated!
+        // assert
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(JOIN_POLICY_1));
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+        PolicyAssignment pa1 = context.getPolicyAssignments().get(JOIN_POLICY_1);
+        PolicyAssignment pa2 = context.getPolicyAssignments().get(TEST_POLICY_1);
+        Assert.assertNotEquals(pa1.getQueueId(), pa2.getQueueId());
+
+        StreamWorkSlotQueue joinPair = getQueue(context, pa1.getQueueId()).getRight();
+        String joinTopo = joinPair.getWorkingSlots().get(0).topologyName;
+        StreamWorkSlotQueue streamPair = getQueue(context, pa2.getQueueId()).getRight();
+        String streamTopo = streamPair.getWorkingSlots().get(0).topologyName;
+        Assert.assertNotEquals(joinTopo, streamTopo);
+
+        // TODO more assert on state
+        SpoutSpec joinSpout = state.getSpoutSpecs().get(joinTopo);
+        RouterSpec groupSpec = state.getGroupSpecs().get(joinTopo);
+        AlertBoltSpec alertSpec = state.getAlertSpecs().get(joinTopo);
+
+        Assert.assertEquals(1, joinSpout.getStreamRepartitionMetadataMap().size());
+        Assert.assertEquals(2, joinSpout.getStreamRepartitionMetadataMap().get(TEST_TOPIC).size());
+
+        Assert.assertEquals(2, groupSpec.getRouterSpecs().size());
+
+        LOG.info(new ObjectMapper().writeValueAsString(state));
+    }
+
+    private void createJoinPolicy(InMemScheduleConext context, String policyName, List<String> asList) {
+        PolicyDefinition pd = new PolicyDefinition();
+        pd.setParallelismHint(5);
+        Definition def = new Definition();
+        pd.setDefinition(def);
+        pd.setName(policyName);
+        pd.setInputStreams(asList);
+        pd.setOutputStreams(Arrays.asList("outputStream2"));
+        for (String streamId : pd.getInputStreams()) {
+            StreamPartition par = new StreamPartition();
+            par.setColumns(Arrays.asList("col1"));
+            par.setType(StreamPartition.Type.GROUPBY);
+            par.setStreamId(streamId);
+            pd.addPartition(par);
+        }
+        context.addPoilcy(pd);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
new file mode 100644
index 0000000..e17ebab
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.alert.coordinator.mock.TestTopologyMgmtService;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordinator.impl.WorkQueueBuilder;
+import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * @since Apr 27, 2016
+ *
+ */
+public class WorkSlotStrategyTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WorkSlotStrategyTest.class);
+
+    @Test
+    public void test() {
+        InMemScheduleConext context = new InMemScheduleConext();
+
+        StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.GLOBAL);
+        partition.setStreamId("s1");
+        partition.setColumns(Arrays.asList("f1", "f2"));
+        
+        StreamGroup group = new StreamGroup();
+        group.addStreamPartition(partition);
+
+        TestTopologyMgmtService.BOLT_NUMBER = 3;
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
+
+        SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService);
+        {
+            List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>());
+            Assert.assertEquals(0, slots.size());
+            Assert.assertEquals(1, context.getTopologies().size());
+        }
+
+        TestTopologyMgmtService.BOLT_NUMBER = 5;
+        {
+            List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>());
+            Assert.assertEquals(5, slots.size());
+            LOG.info(slots.get(0).getTopologyName());
+            Assert.assertEquals(2, context.getTopologies().size());
+            Assert.assertEquals(2, context.getTopologyUsages().size());
+
+            // assert all on same topology
+            for (WorkSlot ws : slots) {
+                Assert.assertEquals(slots.get(0).getTopologyName(), ws.getTopologyName());
+            }
+            Iterator<TopologyUsage> it = context.getTopologyUsages().values().iterator();
+            TopologyUsage usage = it.next();
+            for (AlertBoltUsage u : usage.getAlertUsages().values()) {
+                Assert.assertTrue(u.getPartitions().size() == 0);
+                Assert.assertTrue(u.getQueueSize() == 0);
+            }
+            // assert
+            usage = it.next();
+            for (AlertBoltUsage u : usage.getAlertUsages().values()) {
+                LOG.info(u.getBoltId());
+                Assert.assertTrue(u.getPartitions().size() == 0);
+                Assert.assertTrue(u.getBoltId(), u.getQueueSize() == 0);
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    @Test
+    public void test2_overlap() {
+        InMemScheduleConext context = new InMemScheduleConext();
+
+        StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.GLOBAL);
+        partition.setStreamId("s1");
+        partition.setColumns(Arrays.asList("f1", "f2"));
+        StreamGroup sg = new StreamGroup();
+        sg.addStreamPartition(partition);
+
+        MonitoredStream ms1 = new MonitoredStream(sg);
+
+        TestTopologyMgmtService.BOLT_NUMBER = 5;
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
+
+        String topo1 = null;
+        String bolt1 = null;
+        WorkQueueBuilder wrb = new WorkQueueBuilder(context, mgmtService);
+        StreamWorkSlotQueue queue = wrb.createQueue(ms1, false, 5, new HashMap<String, Object>());
+        {
+            Assert.assertEquals(5, queue.getWorkingSlots().size());
+            topo1 = queue.getWorkingSlots().get(0).getTopologyName();
+            bolt1 = queue.getWorkingSlots().get(0).getBoltId();
+            Assert.assertEquals(1, context.getTopologies().size());
+            Assert.assertEquals(1, context.getTopologyUsages().size());
+            LOG.info(queue.getWorkingSlots().get(0).getTopologyName());
+            for (WorkSlot ws : queue.getWorkingSlots()) {
+                Assert.assertEquals(topo1, ws.getTopologyName());
+            }
+
+            TopologyUsage usage = context.getTopologyUsages().values().iterator().next();
+            for (AlertBoltUsage u : usage.getAlertUsages().values()) {
+                Assert.assertTrue(u.getPartitions().size() > 0);
+                Assert.assertTrue(u.getBoltId(), u.getQueueSize() > 0);
+            }
+        }
+
+        // second partition
+        StreamPartition partition2 = new StreamPartition();
+        partition2.setType(StreamPartition.Type.GLOBAL);
+        partition2.setStreamId("s2");
+        partition2.setColumns(Arrays.asList("f1", "f2"));
+
+        StreamGroup sg2 = new StreamGroup();
+        sg2.addStreamPartition(partition2);
+        MonitoredStream ms2 = new MonitoredStream(sg2);
+        queue = wrb.createQueue(ms2, false, 5, new HashMap<String, Object>());
+        TestTopologyMgmtService.BOLT_NUMBER = 5;
+        {
+            Assert.assertEquals(5, queue.getWorkingSlots().size());
+            Assert.assertEquals(2, context.getTopologies().size());
+            Assert.assertEquals(2, context.getTopologyUsages().size());
+            
+            String topo2 = queue.getWorkingSlots().get(0).getTopologyName();
+            String bolt2 = queue.getWorkingSlots().get(0).getBoltId();
+            for (WorkSlot ws : queue.getWorkingSlots()) {
+                Assert.assertEquals(topo2, ws.getTopologyName());
+            }
+            Assert.assertNotEquals(topo1, topo2);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
new file mode 100644
index 0000000..2fc1794
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+
+/**
+ * @since May 5, 2016
+ *
+ */
+@SuppressWarnings("serial")
+public class InMemMetadataServiceClient implements IMetadataServiceClient {
+
+    private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
+    private List<Topology> topologies = new ArrayList<Topology>();
+    private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
+    private List<StreamDefinition> definitions = new ArrayList<StreamDefinition>();
+    private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
+
+    private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
+    private List<SpoutSpec> spoutSpecs = new ArrayList<SpoutSpec>();
+    private List<Publishment> publishmetns = new ArrayList<Publishment>();
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public List<StreamingCluster> listClusters() {
+        return clusters;
+    }
+
+    @Override
+    public List<Topology> listTopologies() {
+        return topologies;
+    }
+
+    @Override
+    public List<PolicyDefinition> listPolicies() {
+        return policies;
+    }
+
+    @Override
+    public List<StreamDefinition> listStreams() {
+        return definitions;
+    }
+
+    @Override
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return datasources;
+    }
+
+    @Override
+    public List<Publishment> listPublishment() {
+        return publishmetns;
+    }
+
+    @Override
+    public List<SpoutSpec> listSpoutMetadata() {
+        return spoutSpecs;
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec() {
+        Iterator<Entry<String, ScheduleState>> it = scheduleStates.entrySet().iterator();
+        if (it.hasNext()) {
+            return it.next().getValue();
+        }
+        return null;
+    }
+
+    @Override
+    public ScheduleState getVersionedSpec(String version) {
+        return scheduleStates.get(version);
+    }
+
+    @Override
+    public void addScheduleState(ScheduleState state) {
+        scheduleStates.put(state.getVersion(), state);
+    }
+
+    @Override
+    public void addStreamingCluster(StreamingCluster cluster) {
+        clusters.add(cluster);
+    }
+
+    @Override
+    public void addTopology(Topology t) {
+        topologies.add(t);
+    }
+
+    @Override
+    public void addPolicy(PolicyDefinition policy) {
+        policies.add(policy);
+    }
+
+    @Override
+    public void addStreamDefinition(StreamDefinition streamDef) {
+        definitions.add(streamDef);
+    }
+
+    @Override
+    public void addDataSource(Kafka2TupleMetadata k2t) {
+        datasources.add(k2t);
+    }
+
+    @Override
+    public void addPublishment(Publishment pub) {
+        publishmetns.add(pub);
+    }
+
+    @Override
+    public void clear() {
+        // do nothing
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
new file mode 100644
index 0000000..1a690d6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator.mock;
+
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+
+public class TestTopologyMgmtService extends TopologyMgmtService {
+
+    public static int BOLT_NUMBER = 5;
+    
+    private int i = 0;
+
+    @Override
+    public TopologyMeta creatTopology() {
+        TopologyMeta tm = new TopologyMeta();
+        tm.topologyId = "Topoloy" + (i++);
+        tm.clusterId = "default-cluster";
+        tm.nimbusHost = "localhost";
+        tm.nimbusPort = "3000";
+        Pair<Topology, TopologyUsage> pair = TestTopologyMgmtService.createEmptyTopology(tm.topologyId);
+        tm.topology = pair.getLeft();
+        tm.usage = pair.getRight();
+        
+        return tm;
+    }
+
+    @Override
+    public List<TopologyMeta> listTopologies() {
+        // TODO Auto-generated method stub
+        return super.listTopologies();
+    }
+
+    public static Pair<Topology, TopologyUsage> createEmptyTopology(String topoName) {
+        Topology t = new Topology(topoName, TestTopologyMgmtService.BOLT_NUMBER, TestTopologyMgmtService.BOLT_NUMBER);
+        for (int i = 0; i < t.getNumOfGroupBolt(); i++) {
+            t.getGroupNodeIds().add(t.getName() + "-grp-" + i);
+        }
+        for (int i = 0; i < t.getNumOfAlertBolt(); i++) {
+            t.getAlertBoltIds().add(t.getName() + "-alert-" + i);
+        }
+    
+        TopologyUsage u = new TopologyUsage(topoName);
+        for (String gnid : t.getGroupNodeIds()) {
+            u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid));
+        }
+        for (String anid : t.getAlertBoltIds()) {
+            u.getAlertUsages().put(anid, new AlertBoltUsage(anid));
+        }
+    
+        return Pair.of(t, u);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-coordinator/src/test/resources/test-application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/resources/test-application.conf b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/resources/test-application.conf
new file mode 100644
index 0000000..deb9793
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/resources/test-application.conf
@@ -0,0 +1,27 @@
+{
+	"coordinator" : {
+		"policiesPerBolt" : 5,
+		"boltParallelism" : 5,
+		"policyDefaultParallelism" : 5,
+		"boltLoadUpbound": 0.8,
+		"topologyLoadUpbound" : 0.8,
+		"numOfAlertBoltsPerTopology" : 5,
+		"zkConfig": {
+			"zkQuorum" : "localhost:2181",
+			"zkRoot": "/alert",
+			"zkSessionTimeoutMs" : 10000,
+			"connectionTimeoutMs" : 5000,
+			"zkRetryTimes" : 5,
+			"zkRetryInterval" : 1000
+		},
+		"metadataService": {
+			"context" : "/api",
+			"host" : "localhost",
+			"port" : 8080
+		},
+		"metadataDynamicCheck" : {
+			"initDelayMillis" : 1000,
+			"delayMillis" : 30000
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/.gitignore b/eagle-core/eagle-alert/alert/alert-devtools/.gitignore
new file mode 100644
index 0000000..415b892
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/.gitignore
@@ -0,0 +1,3 @@
+logs/
+target/
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-producer.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-producer.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-producer.sh
new file mode 100755
index 0000000..59068de
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-producer.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$EAGLE_HEAP_OPTS" = "x" ]; then
+    export EAGLE_HEAP_OPTS="-Xmx512M"
+fi
+
+exec $(dirname $0)/run-class.sh org.apache.eagle.contrib.kafka.ProducerTool "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-start.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-start.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-start.sh
new file mode 100755
index 0000000..a5c0ed0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-start.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ $# -lt 1 ];
+then
+	echo "USAGE: $0 [-daemon] conf/kafka-server.properties [--override property=value]*"
+	exit 1
+fi
+
+base_dir=$(dirname $0)
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../conf/log4j.properties"
+fi
+
+if [ "x$HEAP_OPTS" = "x" ]; then
+    export EAGLE_HEAP_OPTS="-Xmx1G -Xms1G"
+fi
+
+EXTRA_ARGS="-name kafkaServer -loggc"
+
+COMMAND=$1
+case $COMMAND in
+  -daemon)
+    EXTRA_ARGS="-daemon "$EXTRA_ARGS
+    shift
+    ;;
+  *)
+    ;;
+esac
+
+$base_dir/kafka-server-status.sh 1>/dev/null 2>&1
+if [ "$?" == "" ];then
+	echo "Kafka is still running, please stop firstly before starting"
+	exit 0
+else
+	exec $base_dir/run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-status.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-status.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-status.sh
new file mode 100755
index 0000000..c794425
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-status.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
+
+if [ -z "$PIDS" ]; then
+  echo "No kafka server is running"
+  exit 1
+else
+  echo "Kafka server is running at $PIDS"
+  exit 0
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-stop.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-stop.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-stop.sh
new file mode 100755
index 0000000..9de6d76
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-server-stop.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
+
+if [ -z "$PIDS" ]; then
+  echo "No kafka server to stop"
+  exit 1
+else
+  kill -s TERM $PIDS
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-topics.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-topics.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-topics.sh
new file mode 100755
index 0000000..6f40511
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/kafka-topics.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/run-class.sh kafka.admin.TopicCommand "$@"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/run-class.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/run-class.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/run-class.sh
new file mode 100755
index 0000000..8886bae
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/run-class.sh
@@ -0,0 +1,112 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ $# -lt 1 ];
+then
+  echo "USAGE: $0 [-daemon] [-name servicename] [-loggc] classname [opts]"
+  exit 1
+fi
+
+project_base=$(dirname $0)/..
+
+for file in $project_base/target/lib/*;do
+	export CLASSPATH=$file:$CLASSPATH
+done
+for file in $project_base/target/*.jar;do
+	export CLASSPATH=$file:$CLASSPATH
+done
+
+if [ -z "JMX_OPTS" ]; then
+  export JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
+fi
+
+# Log directory to use
+if [ "x$LOG_DIR" = "x" ]; then
+    LOG_DIR="$project_base/logs"
+fi
+
+# create logs directory
+if [ ! -d "$LOG_DIR" ]; then
+	mkdir -p "$LOG_DIR"
+fi
+
+# Log4j settings
+if [ -z "$LOG4J_OPTS" ]; then
+  # Log to console. This is a tool.
+  LOG4J_OPTS="-Dlog4j.configuration=file:$project_base/conf/cli-log4j.properties"
+else
+  # create logs directory
+  if [ ! -d "$LOG_DIR" ]; then
+    mkdir -p "$LOG_DIR"
+  fi
+fi
+
+# Which java to use
+if [ -z "$JAVA_HOME" ]; then
+  JAVA="java"
+else
+  JAVA="$JAVA_HOME/bin/java"
+fi
+
+# Memory options
+if [ -z "$HEAP_OPTS" ]; then
+  HEAP_OPTS="-Xmx256M"
+fi
+
+# JVM performance options
+if [ -z "$JVM_PERFORMANCE_OPTS" ]; then
+  JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true"
+fi
+
+while [ $# -gt 0 ]; do
+  COMMAND=$1
+  case $COMMAND in
+    -name)
+      DAEMON_NAME=$2
+      CONSOLE_OUTPUT_FILE=$LOG_DIR/$DAEMON_NAME.out
+      shift 2
+      ;;
+    -loggc)
+      if [ -z "$GC_LOG_OPTS" ]; then
+        GC_LOG_ENABLED="true"
+      fi
+      shift
+      ;;
+    -daemon)
+      DAEMON_MODE="true"
+      shift
+      ;;
+    *)
+      break
+      ;;
+  esac
+done
+
+# GC options
+GC_FILE_SUFFIX='-gc.log'
+GC_LOG_FILE_NAME=''
+if [ "x$GC_LOG_ENABLED" = "xtrue" ]; then
+  GC_LOG_FILE_NAME=$DAEMON_NAME$GC_FILE_SUFFIX
+  GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
+fi
+
+# Launch mode
+if [ "x$DAEMON_MODE" = "xtrue" ]; then
+  nohup $JAVA $HEAP_OPTS $JVM_PERFORMANCE_OPTS $GC_LOG_OPTS $EAGLE_JMX_OPTS $LOG4J_OPTS -cp $CLASSPATH $EAGLE_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
+else
+  exec $JAVA $HEAP_OPTS $JVM_PERFORMANCE_OPTS $GC_LOG_OPTS $EAGLE_JMX_OPTS $LOG4J_OPTS -cp $CLASSPATH $EAGLE_OPTS "$@"
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-coordinator.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-coordinator.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-coordinator.sh
new file mode 100755
index 0000000..1f5bea0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-coordinator.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/../../alert-coordinator
+
+mvn jetty:run 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration1.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration1.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration1.sh
new file mode 100755
index 0000000..cb66149
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration1.sh
@@ -0,0 +1,16 @@
+#!/bin/bash
+
+echo $(dirname $0)
+
+#start topology
+echo "starting topology..."
+cd $(dirname $0)/../../alert-engine/alert-engine-base/
+
+
+echo " as dev tests, tail -f test.log | grep AlertStreamEvent for alert stream....."
+
+mvn test -Dtest=org.apache.eagle.alert.engine.e2e.Integration1 | tee test.log
+
+# tail log output
+# tail -f test.log | grep AlertStreamEvent
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration2.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration2.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration2.sh
new file mode 100755
index 0000000..b7797b0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-integration2.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+
+echo $(dirname $0)
+
+export MAVEN_OPTS=-Xms256M -Xmx1024M
+
+#start topology
+echo "starting topology..."
+cd $(dirname $0)/../../alert-engine/alert-engine-base/
+
+
+echo " as dev tests, tail -f test.log | grep AlertStreamEvent for alert stream....."
+
+
+mvn test -Dtest=org.apache.eagle.alert.engine.e2e.Integration2 | tee test.log
+
+# tail log output
+# tail -f test.log | grep AlertStreamEvent
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-metadata.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-metadata.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-metadata.sh
new file mode 100755
index 0000000..1732c04
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-metadata.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/../../alert-metadata-parent/alert-metadata-service
+
+mvn jetty:run 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient1.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient1.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient1.sh
new file mode 100755
index 0000000..df65875
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient1.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+cd $(dirname $0)/../../alert-assembly/
+
+java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/0.9.3/storm-core-0.9.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient1
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient2.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient2.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient2.sh
new file mode 100755
index 0000000..742b612
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-sampleclient2.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+cd $(dirname $0)/../../alert-assembly/
+
+java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/0.9.3/storm-core-0.9.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient2
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/start-zk-kafka.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/start-zk-kafka.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-zk-kafka.sh
new file mode 100755
index 0000000..c58de65
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/start-zk-kafka.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/..
+
+echo "Starting zookeeper"
+bin/zookeeper-server-start.sh -daemon conf/zookeeper-server.properties
+sleep 1
+bin/zookeeper-server-status.sh
+
+echo "Starting kafka"
+bin/kafka-server-start.sh -daemon conf/kafka-server.properties
+sleep 1
+bin/kafka-server-status.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/stop-zk-kafka.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/stop-zk-kafka.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/stop-zk-kafka.sh
new file mode 100755
index 0000000..dc8a898
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/stop-zk-kafka.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/..
+
+echo "Stopping zookeeper"
+bin/zookeeper-server-stop.sh -daemon conf/zookeeper-server.properties
+sleep 1
+bin/zookeeper-server-status.sh
+
+echo "Stopping kafka"
+bin/kafka-server-stop.sh -daemon conf/kafka-server.properties
+sleep 1
+bin/kafka-server-status.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-start.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-start.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-start.sh
new file mode 100755
index 0000000..a68c271
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-start.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ $# -lt 1 ];
+then
+	echo "USAGE: $0 [-daemon] conf/zookeeper-server.properties"
+	exit 1
+fi
+base_dir=$(dirname $0)
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../conf/log4j.properties"
+fi
+
+if [ "x$EAGLE_HEAP_OPTS" = "x" ]; then
+    export EAGLE_HEAP_OPTS="-Xmx512M -Xms512M"
+fi
+
+EXTRA_ARGS="-name zookeeper -loggc"
+
+COMMAND=$1
+case $COMMAND in
+  -daemon)
+     EXTRA_ARGS="-daemon "$EXTRA_ARGS
+     shift
+     ;;
+ *)
+     ;;
+esac
+
+$base_dir/zookeeper-server-status.sh 1>/dev/null 2>&1
+if [ "$?" == "" ];then
+	echo "Zookeeper is still running, please stop firstly before starting"
+	exit 0
+else
+	exec $base_dir/run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-status.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-status.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-status.sh
new file mode 100755
index 0000000..223a310
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-status.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
+
+if [ -z "$PIDS" ]; then
+  echo "No zookeeper server is running"
+  exit 1
+else
+  echo "Zookeeper server is running at $PIDS"
+  exit 0
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-stop.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-stop.sh b/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-stop.sh
new file mode 100755
index 0000000..4155182
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/bin/zookeeper-server-stop.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
+
+if [ -z "$PIDS" ]; then
+  echo "No zookeeper server to stop"
+  exit 1
+else
+  kill -s TERM $PIDS
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/conf/cli-log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/conf/cli-log4j.properties b/eagle-core/eagle-alert/alert/alert-devtools/conf/cli-log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/conf/cli-log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/conf/kafka-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/conf/kafka-server.properties b/eagle-core/eagle-alert/alert/alert-devtools/conf/kafka-server.properties
new file mode 100644
index 0000000..a9e0010
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/conf/kafka-server.properties
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = security_protocol://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+
+# The number of threads handling network requests
+num.network.threads=3
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=/tmp/dev-kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=6000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/conf/log4j.properties b/eagle-core/eagle-alert/alert/alert-devtools/conf/log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/conf/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties b/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties
new file mode 100644
index 0000000..dd71ffc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-devtools/conf/zookeeper-server.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/dev-zookeeper-data
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
\ No newline at end of file


Mime
View raw message