eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [2/2] incubator-eagle git commit: [EAGLE-749] Add unit test for model.
Date Wed, 09 Nov 2016 13:56:06 GMT
[EAGLE-749] Add unit test for model.

 - Add unit test for model which under alert-common moudle.
 - Fix equals() hashcode() inconsistent for PartitionedEvent,StreamEvent,Publishment.
 - Fix bug for StreamColumn BooleanType.

https://issues.apache.org/jira/browse/EAGLE-749

Author: r7raul1984 <tangjijun@yhd.com>

Closes #632 from r7raul1984/EAGLE-749.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2b61cef5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2b61cef5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2b61cef5

Branch: refs/heads/master
Commit: 2b61cef585b02fb6f40d5c3de7f7c5c060926ecd
Parents: 1da8dc4
Author: r7raul1984 <tangjijun@yhd.com>
Authored: Wed Nov 9 21:55:51 2016 +0800
Committer: Hao Chen <hao@apache.org>
Committed: Wed Nov 9 21:55:51 2016 +0800

----------------------------------------------------------------------
 .../alert/engine/coordinator/Publishment.java   |  21 +-
 .../alert/engine/coordinator/StreamColumn.java  |   2 +-
 .../engine/coordinator/StreamSortSpec.java      |  17 +-
 .../alert/engine/model/PartitionedEvent.java    |  16 +-
 .../eagle/alert/engine/model/StreamEvent.java   |  11 +-
 .../eagle/alert/config/TestConfigBus.java       |   5 +-
 .../model/Kafka2TupleMetadataTest.java          |  49 +++++
 .../model/PolicyWorkerQueueTest.java            |  63 ++++++
 .../model/StreamRepartitionStrategyTest.java    |  74 +++++++
 .../model/StreamRouterSpecTest.java             |  59 ++++++
 .../model/Tuple2StreamMetadataTest.java         |  50 +++++
 .../alert/coordination/model/WorkSlotTest.java  |  43 +++++
 .../model/internal/MonitoredStreamTest.java     |  69 +++++++
 .../model/internal/PolicyAssignmentTest.java    |  37 ++++
 .../model/internal/StreamGroupTest.java         |  67 +++++++
 .../model/internal/StreamWorkSlotQueueTest.java |  61 ++++++
 .../model/internal/TopologyTest.java            |  47 +++++
 .../OverrideDeduplicatorSpecTest.java           |  61 ++++++
 .../coordinator/PolicyDefinitionTest.java       | 140 ++++++++++++++
 .../engine/coordinator/PublishmentTest.java     | 100 ++++++++++
 .../engine/coordinator/PublishmentTypeTest.java |  47 +++++
 .../engine/coordinator/StreamColumnTest.java    | 153 +++++++++++++++
 .../coordinator/StreamDefinitionTest.java       |  52 +++++
 .../engine/coordinator/StreamPartitionTest.java |  43 +++++
 .../engine/coordinator/StreamSortSpecTest.java  |  45 +++++
 .../coordinator/StreamingClusterTest.java       |  47 +++++
 .../engine/model/AlertPublishEventTest.java     | 110 +++++++++++
 .../engine/model/AlertStreamEventTest.java      |  63 ++++++
 .../engine/model/PartitionedEventTest.java      |  54 ++++++
 .../engine/model/StreamEventBuilderTest.java    | 166 ++++++++++++++++
 .../alert/engine/model/StreamEventTest.java     | 191 +++++++++++++++++++
 .../eagle/alert/model/StreamEventTest.java      |  68 -------
 .../eagle/alert/model/TestPolicyDefinition.java |  45 -----
 33 files changed, 1930 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index dbb1844..d1cc33a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -135,28 +135,29 @@ public class Publishment {
         if (obj instanceof Publishment) {
             Publishment p = (Publishment) obj;
             return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType())
-                && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
-                && Objects.equals(dedupFields, p.getDedupFields())
-                && Objects.equals(dedupStateField, p.getDedupStateField())
-                && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
-                && Objects.equals(policyIds, p.getPolicyIds())
-                && Objects.equals(streamIds, p.getStreamIds())
-                && properties.equals(p.getProperties()));
+                    && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin())
+                    && Objects.equals(dedupFields, p.getDedupFields())
+                    && Objects.equals(dedupStateField, p.getDedupStateField())
+                    && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator())
+                    && Objects.equals(policyIds, p.getPolicyIds())
+                    && Objects.equals(streamIds, p.getStreamIds())
+                    && properties.equals(p.getProperties()));
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds)
-            .append(properties).build();
+        return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(dedupFields)
+                .append(dedupStateField).append(overrideDeduplicator).append(policyIds).append(streamIds)
+                .append(properties).build();
     }
 
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
-            .append(policyIds).append(",properties:").append(properties);
+                .append(policyIds).append(",properties:").append(properties);
         return sb.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
index 5a5f2cc..4628043 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -85,7 +85,7 @@ public class StreamColumn implements Serializable {
                     this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
                     break;
                 case BOOL:
-                    this.setDefaultValue(Double.valueOf((String) this.getDefaultValue()));
+                    this.setDefaultValue(Boolean.valueOf((String) this.getDefaultValue()));
                     break;
                 case OBJECT:
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
index 65b9151..ff05fc8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.commons.lang.builder.HashCodeBuilder;
@@ -45,7 +46,7 @@ public class StreamSortSpec implements Serializable {
     }
 
     public int getWindowPeriodMillis() {
-        if (windowPeriod != null) {
+        if (StringUtils.isNotBlank(windowPeriod)) {
             return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
         } else {
             return 0;
@@ -76,9 +77,9 @@ public class StreamSortSpec implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(windowPeriod)
-            .append(windowMargin)
-            .toHashCode();
+                .append(windowPeriod)
+                .append(windowMargin)
+                .toHashCode();
     }
 
     @Override
@@ -92,14 +93,14 @@ public class StreamSortSpec implements Serializable {
 
         StreamSortSpec another = (StreamSortSpec) that;
         return
-            another.windowPeriod.equals(this.windowPeriod)
-                && another.windowMargin == this.windowMargin;
+                another.windowPeriod.equals(this.windowPeriod)
+                        && another.windowMargin == this.windowMargin;
     }
 
     @Override
     public String toString() {
         return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
-            this.getWindowPeriod(),
-            this.getWindowMargin());
+                this.getWindowPeriod(),
+                this.getWindowMargin());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
index 51e4532..ecca0ff 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -19,6 +19,7 @@ package org.apache.eagle.alert.engine.model;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import backtype.storm.tuple.Tuple;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -63,9 +64,9 @@ public class PartitionedEvent implements Serializable {
         if (obj instanceof PartitionedEvent) {
             PartitionedEvent another = (PartitionedEvent) obj;
             return !(this.partitionKey != another.getPartitionKey()
-                || !Objects.equals(this.event, another.getEvent())
-                || !Objects.equals(this.partition, another.getPartition())
-                || !Objects.equals(this.anchor, another.anchor));
+                    || !Objects.equals(this.event, another.getEvent())
+                    || !Objects.equals(this.partition, another.getPartition())
+                    || !Objects.equals(this.anchor, another.anchor));
         } else {
             return false;
         }
@@ -74,10 +75,11 @@ public class PartitionedEvent implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(partitionKey)
-            .append(event)
-            .append(partition)
-            .build();
+                .append(partitionKey)
+                .append(event)
+                .append(partition)
+                .append(anchor)
+                .build();
     }
 
     public StreamEvent getEvent() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
index 8480bc5..130985f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
@@ -95,7 +95,8 @@ public class StreamEvent implements Serializable {
         }
         if (obj instanceof StreamEvent) {
             StreamEvent another = (StreamEvent) obj;
-            return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data);
+            return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp
+                    && Arrays.deepEquals(this.data, another.data) && Objects.equals(this.metaVersion, another.metaVersion);
         }
         return false;
     }
@@ -113,10 +114,10 @@ public class StreamEvent implements Serializable {
             }
         }
         return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",
-            this.getStreamId(),
-            DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-            StringUtils.join(dataStrings, ","),
-            this.getMetaVersion());
+                this.getStreamId(),
+                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
+                StringUtils.join(dataStrings, ","),
+                this.getMetaVersion());
     }
 
     public static StreamEventBuilder builder() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
index 5c3f35e..e37e9be 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/config/TestConfigBus.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.config;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -69,8 +70,8 @@ public class TestConfigBus {
     }
 
     @After
-    public void shutdown() {
-        CloseableUtils.closeQuietly(server);
+    public void shutdown() throws IOException {
+        server.stop();
         producer.close();
         consumer.close();
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
new file mode 100644
index 0000000..a252fae
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadataTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Kafka2TupleMetadataTest {
+    @Test
+    public void testKafka2TupleMetadata() {
+        Kafka2TupleMetadata kafka2TupleMetadata = new Kafka2TupleMetadata();
+        kafka2TupleMetadata.setName("setName");
+        kafka2TupleMetadata.setCodec(new Tuple2StreamMetadata());
+        kafka2TupleMetadata.setType("setType");
+        kafka2TupleMetadata.setTopic("setTopic");
+        kafka2TupleMetadata.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+        Kafka2TupleMetadata kafka2TupleMetadata1 = new Kafka2TupleMetadata();
+        kafka2TupleMetadata1.setName("setName");
+        kafka2TupleMetadata1.setCodec(new Tuple2StreamMetadata());
+        kafka2TupleMetadata1.setType("setType");
+        kafka2TupleMetadata1.setTopic("setTopic");
+        kafka2TupleMetadata1.setSchemeCls("org.apache.eagle.alert.engine.scheme.PlainStringScheme");
+
+        Assert.assertFalse(kafka2TupleMetadata1 == kafka2TupleMetadata);
+        Assert.assertTrue(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+        Assert.assertTrue(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+
+        kafka2TupleMetadata1.setType("setType1");
+
+        Assert.assertFalse(kafka2TupleMetadata1.equals(kafka2TupleMetadata));
+        Assert.assertFalse(kafka2TupleMetadata1.hashCode() == kafka2TupleMetadata.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
new file mode 100644
index 0000000..71f3188
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueueTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PolicyWorkerQueueTest {
+    @Test
+    public void testPolicyWorkerQueue() {
+
+        List<WorkSlot> workers = new ArrayList<>();
+        WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+        WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+        workers.add(workSlot1);
+        workers.add(workSlot2);
+        PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+        Assert.assertEquals(null, policyWorkerQueue.getPartition());
+        Assert.assertEquals(workSlot1, policyWorkerQueue.getWorkers().get(0));
+        Assert.assertEquals(workSlot2, policyWorkerQueue.getWorkers().get(1));
+        Assert.assertEquals("[(setTopologyName1:setBoltId1),(setTopologyName1:setBoltId2)]", policyWorkerQueue.toString());
+
+        PolicyWorkerQueue policyWorkerQueue1 = new PolicyWorkerQueue();
+        policyWorkerQueue1.setWorkers(workers);
+
+        Assert.assertTrue(policyWorkerQueue.equals(policyWorkerQueue1));
+        Assert.assertTrue(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        policyWorkerQueue1.setPartition(streamPartition);
+
+        Assert.assertFalse(policyWorkerQueue.equals(policyWorkerQueue1));
+        Assert.assertFalse(policyWorkerQueue.hashCode() == policyWorkerQueue1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
new file mode 100644
index 0000000..c416a49
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategyTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRepartitionStrategyTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testStreamRepartitionStrategy() {
+        thrown.expect(NullPointerException.class);
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        streamRepartitionStrategy.hashCode();
+    }
+
+    @Test
+    public void testStreamRepartitionStrategy1() {
+        thrown.expect(NullPointerException.class);
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        streamRepartitionStrategy.equals(streamRepartitionStrategy);
+    }
+
+    @Test
+    public void testStreamRepartitionStrategy2() {
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+
+
+        StreamRepartitionStrategy streamRepartitionStrategy = new StreamRepartitionStrategy();
+        Assert.assertEquals(null, streamRepartitionStrategy.getPartition());
+        Assert.assertEquals(0, streamRepartitionStrategy.getNumTotalParticipatingRouterBolts());
+        Assert.assertEquals(0, streamRepartitionStrategy.getStartSequence());
+        streamRepartitionStrategy.setPartition(streamPartition);
+        StreamRepartitionStrategy streamRepartitionStrategy1 = new StreamRepartitionStrategy();
+        streamRepartitionStrategy1.setPartition(streamPartition);
+
+        Assert.assertTrue(streamRepartitionStrategy.equals(streamRepartitionStrategy1));
+        Assert.assertTrue(streamRepartitionStrategy.hashCode() == streamRepartitionStrategy1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
new file mode 100644
index 0000000..88e72cb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/StreamRouterSpecTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamRouterSpecTest {
+    @Test
+    public void testStreamRouterSpec() {
+        StreamRouterSpec streamRouterSpec = new StreamRouterSpec();
+        Assert.assertEquals(null, streamRouterSpec.getPartition());
+        Assert.assertEquals(null, streamRouterSpec.getStreamId());
+        Assert.assertTrue(streamRouterSpec.getTargetQueue().isEmpty());
+
+        List<WorkSlot> workers = new ArrayList<>();
+        WorkSlot workSlot1 = new WorkSlot("setTopologyName1", "setBoltId1");
+        WorkSlot workSlot2 = new WorkSlot("setTopologyName1", "setBoltId2");
+        workers.add(workSlot1);
+        workers.add(workSlot2);
+        PolicyWorkerQueue policyWorkerQueue = new PolicyWorkerQueue(workers);
+        streamRouterSpec.addQueue(policyWorkerQueue);
+        streamRouterSpec.setStreamId("streamRouterSpec");
+
+        Assert.assertEquals("streamRouterSpec", streamRouterSpec.getStreamId());
+        Assert.assertEquals(1, streamRouterSpec.getTargetQueue().size());
+        Assert.assertEquals(2, streamRouterSpec.getTargetQueue().get(0).getWorkers().size());
+
+        StreamRouterSpec streamRouterSpec1 = new StreamRouterSpec();
+        streamRouterSpec1.addQueue(policyWorkerQueue);
+        streamRouterSpec1.setStreamId("streamRouterSpec1");
+
+        Assert.assertFalse(streamRouterSpec.equals(streamRouterSpec1));
+
+        streamRouterSpec1.setStreamId("streamRouterSpec");
+
+        Assert.assertTrue(streamRouterSpec.equals(streamRouterSpec1));
+        Assert.assertTrue(streamRouterSpec.hashCode() == streamRouterSpec1.hashCode());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
new file mode 100644
index 0000000..8bbfc41
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadataTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class Tuple2StreamMetadataTest {
+    @Test
+    public void testTuple2StreamMetadata() {
+        Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
+        Set activeStreamNames = new HashSet<>();
+        activeStreamNames.add("defaultStringStream");
+        metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+        metadata.setStreamNameSelectorProp(new Properties());
+        metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+        metadata.setActiveStreamNames(activeStreamNames);
+        metadata.setTimestampColumn("timestamp");
+
+        Tuple2StreamMetadata metadata1 = new Tuple2StreamMetadata();
+        Set activeStreamNames1 = new HashSet<>();
+        activeStreamNames1.add("defaultStringStream");
+        metadata1.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+        metadata1.setStreamNameSelectorProp(new Properties());
+        metadata1.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+        metadata1.setActiveStreamNames(activeStreamNames1);
+        metadata1.setTimestampColumn("timestamp");
+
+        Assert.assertFalse(metadata == metadata1);
+        Assert.assertFalse(metadata.equals(metadata1));
+        Assert.assertFalse(metadata.hashCode() == metadata1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
new file mode 100644
index 0000000..48ee73b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/WorkSlotTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WorkSlotTest {
+    @Test
+    public void testWorkSlot() {
+        WorkSlot workSlot = new WorkSlot();
+        Assert.assertEquals("(null:null)", workSlot.toString());
+        Assert.assertEquals(null, workSlot.getBoltId());
+        Assert.assertEquals(null, workSlot.getTopologyName());
+        workSlot.setBoltId("setBoltId");
+        workSlot.setTopologyName("setTopologyName");
+        Assert.assertEquals("(setTopologyName:setBoltId)", workSlot.toString());
+        Assert.assertEquals("setBoltId", workSlot.getBoltId());
+        Assert.assertEquals("setTopologyName", workSlot.getTopologyName());
+
+        WorkSlot workSlot1 = new WorkSlot("setTopologyName", "setBoltId");
+        Assert.assertEquals("(setTopologyName:setBoltId)", workSlot1.toString());
+        Assert.assertEquals("setBoltId", workSlot1.getBoltId());
+        Assert.assertEquals("setTopologyName", workSlot1.getTopologyName());
+        Assert.assertTrue(workSlot1.equals(workSlot));
+        Assert.assertTrue(workSlot1.hashCode() == workSlot.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
new file mode 100644
index 0000000..a2c0d6e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class MonitoredStreamTest {
+
+    @Test
+    public void testMonitoredStream() {
+
+        StreamGroup streamGroup = new StreamGroup();
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        streamGroup.addStreamPartition(streamPartition);
+        WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+        List<WorkSlot> workSlots = new ArrayList<>();
+        workSlots.add(workSlot);
+        StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+        MonitoredStream monitoredStream = new MonitoredStream(streamGroup);
+        Assert.assertEquals(null, monitoredStream.getVersion());
+        Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+        Assert.assertEquals(streamGroup, monitoredStream.getStreamGroup());
+        monitoredStream.addQueues(streamWorkSlotQueue);
+        Assert.assertEquals(streamWorkSlotQueue, monitoredStream.getQueues().get(0));
+
+        MonitoredStream monitoredStream1 = new MonitoredStream(streamGroup);
+        Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+        Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+        monitoredStream.removeQueue(streamWorkSlotQueue);
+        Assert.assertTrue(monitoredStream.getQueues().isEmpty());
+
+        Assert.assertTrue(monitoredStream.equals(monitoredStream1));
+        Assert.assertTrue(monitoredStream.hashCode() == monitoredStream1.hashCode());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
new file mode 100644
index 0000000..1491c77
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignmentTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PolicyAssignmentTest {
+    @Test
+    public void testPolicyAssignment() {
+        PolicyAssignment policyAssignment = new PolicyAssignment("policy", "queue");
+        Assert.assertEquals("policy", policyAssignment.getPolicyName());
+        Assert.assertEquals("queue", policyAssignment.getQueueId());
+        Assert.assertEquals(null, policyAssignment.getVersion());
+        Assert.assertEquals("PolicyAssignment of policy policy, queueId queue, version null !", policyAssignment.toString());
+
+        Assert.assertFalse(policyAssignment.equals(new PolicyAssignment("policy", "queue")));
+        Assert.assertFalse(policyAssignment == new PolicyAssignment("policy", "queue"));
+        Assert.assertFalse(policyAssignment.hashCode() == new PolicyAssignment("policy", "queue").hashCode());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
new file mode 100644
index 0000000..d0f0189
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamGroupTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamGroupTest {
+    @Test
+    public void testStreamGroup() {
+        StreamGroup streamGroup = new StreamGroup();
+        Assert.assertEquals("StreamGroup partitions=: [] ", streamGroup.toString());
+        Assert.assertEquals("SG[]", streamGroup.getStreamId());
+
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        streamGroup.addStreamPartition(streamPartition);
+        Assert.assertEquals("SG[test-]", streamGroup.getStreamId());
+        Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]]] ", streamGroup.toString());
+
+        List<StreamPartition> streamPartitions = new ArrayList<>();
+        streamPartition.setStreamId("test1");
+        streamPartitions.add(streamPartition);
+        streamGroup.addStreamPartitions(streamPartitions);
+        Assert.assertEquals("SG[test1-test1-]", streamGroup.getStreamId());
+
+
+        streamPartitions = new ArrayList<>();
+        StreamPartition streamPartition1 = new StreamPartition();
+        streamPartition1.setStreamId("test2");
+        streamPartitions.add(streamPartition1);
+        streamGroup.addStreamPartitions(streamPartitions);
+        Assert.assertEquals("SG[test1-test1-test2-]", streamGroup.getStreamId());
+        Assert.assertEquals("StreamGroup partitions=: [StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test1,type=GROUPBY,columns=[jobId],sortSpec=[StreamSortSpec[windowPeriod=PT10S,windowMargin=30000]]], StreamPartition[streamId=test2,type=null,columns=[],sortSpec=[null]]] ", streamGroup.toString());
+
+        StreamGroup streamGroup1 = new StreamGroup();
+        streamGroup1.addStreamPartitions(streamGroup.getStreamPartitions());
+        Assert.assertTrue(streamGroup.equals(streamGroup1));
+        Assert.assertTrue(streamGroup.hashCode() == streamGroup1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
new file mode 100644
index 0000000..bc2f74e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueueTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class StreamWorkSlotQueueTest {
+    @Test
+    public void testStreamWorkSlotQueue() {
+        StreamGroup streamGroup = new StreamGroup();
+        StreamSortSpec streamSortSpec = new StreamSortSpec();
+        streamSortSpec.setWindowPeriod("PT10S");
+        StreamPartition streamPartition = new StreamPartition();
+        List<String> columns = new ArrayList<>();
+        columns.add("jobId");
+        streamPartition.setColumns(columns);
+        streamPartition.setSortSpec(streamSortSpec);
+        streamPartition.setStreamId("test");
+        streamPartition.setType(StreamPartition.Type.GROUPBY);
+        streamGroup.addStreamPartition(streamPartition);
+        WorkSlot workSlot = new WorkSlot("setTopologyName", "setBoltId");
+        List<WorkSlot> workSlots = new ArrayList<>();
+        workSlots.add(workSlot);
+        StreamWorkSlotQueue streamWorkSlotQueue = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+
+        Assert.assertTrue(streamWorkSlotQueue.getQueueId().startsWith("SG[test-]"));
+        Assert.assertTrue(streamWorkSlotQueue.getDedicateOption().isEmpty());
+        Assert.assertEquals(0, streamWorkSlotQueue.getNumberOfGroupBolts());
+        Assert.assertEquals(1, streamWorkSlotQueue.getQueueSize());
+        Assert.assertTrue(streamWorkSlotQueue.getTopoGroupStartIndex().isEmpty());
+        Assert.assertEquals(-1, streamWorkSlotQueue.getTopologyGroupStartIndex(""));
+        Assert.assertEquals(workSlot, streamWorkSlotQueue.getWorkingSlots().get(0));
+
+        StreamWorkSlotQueue streamWorkSlotQueue1 = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), workSlots);
+        Assert.assertFalse(streamWorkSlotQueue.equals(streamWorkSlotQueue1));
+        Assert.assertFalse(streamWorkSlotQueue == streamWorkSlotQueue1);
+        Assert.assertFalse(streamWorkSlotQueue.hashCode() == streamWorkSlotQueue1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
new file mode 100644
index 0000000..760657a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/coordination/model/internal/TopologyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.coordination.model.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TopologyTest {
+    @Test
+    public void testTopology() {
+        Topology topology = new Topology("test", 2, 3);
+        Assert.assertEquals(null, topology.getClusterName());
+        Assert.assertEquals("test", topology.getName());
+        Assert.assertEquals(null, topology.getPubBoltId());
+        Assert.assertEquals(null, topology.getSpoutId());
+        Assert.assertEquals(0, topology.getAlertBoltIds().size());
+        Assert.assertEquals(1, topology.getAlertParallelism());
+        Assert.assertEquals(0, topology.getGroupNodeIds().size());
+        Assert.assertEquals(1, topology.getGroupParallelism());
+        Assert.assertEquals(3, topology.getNumOfAlertBolt());
+        Assert.assertEquals(2, topology.getNumOfGroupBolt());
+        Assert.assertEquals(0, topology.getNumOfPublishBolt());
+        Assert.assertEquals(1, topology.getNumOfSpout());
+        Assert.assertEquals(1, topology.getSpoutParallelism());
+
+        Topology topology1 = new Topology("test", 2, 3);
+
+        Assert.assertFalse(topology1.equals(topology));
+        Assert.assertFalse(topology1.hashCode() == topology.hashCode());
+        Assert.assertFalse(topology1 == topology);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
new file mode 100644
index 0000000..cc84c56
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpecTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OverrideDeduplicatorSpecTest {
+
+    @Test
+    public void testOverrideDeduplicatorSpec() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("kafka_broker", "localhost:9092");
+        properties.put("topic", "TEST_TOPIC_NAME");
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec.setClassName("testClass");
+        overrideDeduplicatorSpec.setProperties(properties);
+
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec1.setClassName("testClass");
+        overrideDeduplicatorSpec1.setProperties(properties);
+
+        Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+        Assert.assertTrue(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+        Assert.assertTrue(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+        overrideDeduplicatorSpec1.setClassName("testClass1");
+
+        Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+        Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+        Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+
+        overrideDeduplicatorSpec1.setClassName("testClass");
+        Map<String, String> properties1 = new HashMap<>();
+        properties.put("kafka_broker", "localhost:9092");
+        overrideDeduplicatorSpec1.setProperties(properties1);
+
+        Assert.assertFalse(overrideDeduplicatorSpec1 == overrideDeduplicatorSpec);
+        Assert.assertFalse(overrideDeduplicatorSpec1.equals(overrideDeduplicatorSpec));
+        Assert.assertFalse(overrideDeduplicatorSpec1.hashCode() == overrideDeduplicatorSpec.hashCode());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
new file mode 100644
index 0000000..7acb4f7
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinitionTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class PolicyDefinitionTest {
+
+    @Test
+    public void testPolicyInnerDefinition() {
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("test");
+        def.setType("siddhi");
+        def.setHandlerClass("setHandlerClass");
+        def.setProperties(new HashMap<>());
+        def.setOutputStreams(Arrays.asList("outputStream"));
+        def.setInputStreams(Arrays.asList("inputStream"));
+        Assert.assertEquals("{type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }", def.toString());
+
+        PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+        def1.setValue("test");
+        def1.setType("siddhi");
+        def1.setHandlerClass("setHandlerClass");
+        def1.setProperties(new HashMap<>());
+        def1.setOutputStreams(Arrays.asList("outputStream"));
+        def1.setInputStreams(Arrays.asList("inputStream"));
+
+        Assert.assertFalse(def == def1);
+        Assert.assertTrue(def.equals(def1));
+        Assert.assertTrue(def.hashCode() == def1.hashCode());
+
+        def1.setInputStreams(Arrays.asList("inputStream1"));
+
+        Assert.assertFalse(def.equals(def1));
+        Assert.assertTrue(def.hashCode() == def1.hashCode());//problem  equals() and hashCode() be inconsistent
+
+    }
+
+    @Test
+    public void testPolicyDefinition() {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("test");
+        def.setType("siddhi");
+        def.setHandlerClass("setHandlerClass");
+        def.setProperties(new HashMap<>());
+        def.setOutputStreams(Arrays.asList("outputStream"));
+        def.setInputStreams(Arrays.asList("inputStream"));
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+        pd.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+        pd.setName("policyName");
+        pd.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId("streamName");
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        Assert.assertEquals("{name=\"policyName\",definition={type=\"siddhi\",value=\"test\", inputStreams=\"[inputStream]\", outputStreams=\"[outputStream]\" }}", pd.toString());
+
+        PolicyDefinition pd1 = new PolicyDefinition();
+        PolicyDefinition.Definition def1 = new PolicyDefinition.Definition();
+        def1.setValue("test");
+        def1.setType("siddhi");
+        def1.setHandlerClass("setHandlerClass");
+        def1.setProperties(new HashMap<>());
+        def1.setOutputStreams(Arrays.asList("outputStream"));
+        def1.setInputStreams(Arrays.asList("inputStream"));
+        pd1.setDefinition(def1);
+        pd1.setInputStreams(Arrays.asList("inputStream"));//confuse with PolicyDefinition.Definition InputStreams
+        pd1.setOutputStreams(Arrays.asList("outputStream"));//confuse with PolicyDefinition.Definition OutputStreams
+        pd1.setName("policyName");
+        pd1.setDescription(String.format("Test policy for stream %s", "streamName"));
+
+        StreamPartition sp1 = new StreamPartition();
+        sp1.setStreamId("streamName");
+        sp1.setColumns(Arrays.asList("host"));
+        sp1.setType(StreamPartition.Type.GROUPBY);
+        pd1.addPartition(sp1);
+
+
+        Assert.assertFalse(pd == pd1);
+        Assert.assertTrue(pd.equals(pd1));
+        Assert.assertTrue(pd.hashCode() == pd1.hashCode());
+        sp1.setStreamId("streamName1");
+
+        Assert.assertFalse(pd == pd1);
+        Assert.assertFalse(pd.equals(pd1));
+        Assert.assertFalse(pd.hashCode() == pd1.hashCode());
+
+        sp1.setStreamId("streamName");
+        def1.setOutputStreams(Arrays.asList("outputStream1"));
+
+        Assert.assertFalse(pd == pd1);
+        Assert.assertFalse(pd.equals(pd1));
+
+        Assert.assertTrue(pd.hashCode() == pd1.hashCode());//problem  equals() and hashCode() be inconsistent
+
+    }
+
+    @Test
+    public void testPolicyDefinitionEqualByPolicyStatus() {
+        PolicyDefinition.Definition definition = new PolicyDefinition.Definition();
+        PolicyDefinition policy1 = new PolicyDefinition();
+        policy1.setName("policy1");
+        policy1.setDefinition(definition);
+
+        PolicyDefinition policy2 = new PolicyDefinition();
+        policy2.setName("policy1");
+        policy2.setPolicyStatus(PolicyDefinition.PolicyStatus.DISABLED);
+        policy2.setDefinition(definition);
+
+        PolicyDefinition policy3 = new PolicyDefinition();
+        policy3.setName("policy1");
+        policy3.setDefinition(definition);
+
+        Assert.assertTrue(policy1.equals(policy3));
+        Assert.assertFalse(policy1.equals(policy2));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
new file mode 100644
index 0000000..494d8ca
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class PublishmentTest {
+    @Test
+    public void testPublishment() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("kafka_broker", "localhost:9092");
+        properties.put("topic", "TEST_TOPIC_NAME");
+
+        List<Map<String, Object>> kafkaClientConfig = new ArrayList<>();
+        kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync"));
+        properties.put("kafka_client_config", kafkaClientConfig);
+
+        PolicyDefinition policy = createPolicy("testStream", "testPolicy");
+        Publishment publishment = new Publishment();
+        publishment.setName("testAsyncPublishment");
+        publishment.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+        publishment.setPolicyIds(Arrays.asList(policy.getName()));
+        publishment.setDedupIntervalMin("PT0M");
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec.setClassName("testClass");
+        publishment.setOverrideDeduplicator(overrideDeduplicatorSpec);
+        publishment.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+        publishment.setProperties(properties);
+
+        Assert.assertEquals("Publishment[name:testAsyncPublishment,type:org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher,policyId:[testPolicy],properties:{kafka_client_config=[{name=producer.type, value=sync}], topic=TEST_TOPIC_NAME, kafka_broker=localhost:9092}", publishment.toString());
+
+
+        Publishment publishment1 = new Publishment();
+        publishment1.setName("testAsyncPublishment");
+        publishment1.setType("org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher");
+        publishment1.setPolicyIds(Arrays.asList(policy.getName()));
+        publishment1.setDedupIntervalMin("PT0M");
+        OverrideDeduplicatorSpec overrideDeduplicatorSpec1 = new OverrideDeduplicatorSpec();
+        overrideDeduplicatorSpec1.setClassName("testClass");
+        publishment1.setOverrideDeduplicator(overrideDeduplicatorSpec1);
+        publishment1.setSerializer("org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer");
+        publishment1.setProperties(properties);
+
+        Assert.assertTrue(publishment.equals(publishment1));
+        Assert.assertTrue(publishment.hashCode() == publishment1.hashCode());
+        Assert.assertFalse(publishment == publishment1);
+        publishment1.getOverrideDeduplicator().setClassName("testClass1");
+
+
+        Assert.assertFalse(publishment.equals(publishment1));
+        Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+        Assert.assertFalse(publishment == publishment1);
+
+        publishment1.getOverrideDeduplicator().setClassName("testClass");
+        publishment1.setStreamIds(Arrays.asList("streamid1,streamid2"));
+        Assert.assertFalse(publishment.equals(publishment1));
+        Assert.assertFalse(publishment.hashCode() == publishment1.hashCode());
+        Assert.assertFalse(publishment == publishment1);
+    }
+
+    private PolicyDefinition createPolicy(String streamName, String policyName) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        // expression, something like "PT5S,dynamic,1,host"
+        def.setValue("test");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Arrays.asList("inputStream"));
+        pd.setOutputStreams(Arrays.asList("outputStream"));
+        pd.setName(policyName);
+        pd.setDescription(String.format("Test policy for stream %s", streamName));
+
+        StreamPartition sp = new StreamPartition();
+        sp.setStreamId(streamName);
+        sp.setColumns(Arrays.asList("host"));
+        sp.setType(StreamPartition.Type.GROUPBY);
+        pd.addPartition(sp);
+        return pd;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
new file mode 100644
index 0000000..91f9cf8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/PublishmentTypeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PublishmentTypeTest {
+    @Test
+    public void testPublishmentType() {
+        PublishmentType publishmentType = new PublishmentType();
+        publishmentType.setType("KAFKA");
+        publishmentType.setClassName("setClassName");
+        publishmentType.setDescription("setDescription");
+        publishmentType.setFields("setFields");
+
+        PublishmentType publishmentType1 = new PublishmentType();
+        publishmentType1.setType("KAFKA");
+        publishmentType1.setClassName("setClassName");
+        publishmentType1.setDescription("setDescription");
+        publishmentType1.setFields("setFields");
+
+        Assert.assertFalse(publishmentType.equals(new String("")));
+        Assert.assertFalse(publishmentType == publishmentType1);
+        Assert.assertTrue(publishmentType.equals(publishmentType1));
+        Assert.assertTrue(publishmentType.hashCode() == publishmentType1.hashCode());
+
+        publishmentType1.setType("JMS");
+
+        Assert.assertFalse(publishmentType.equals(publishmentType1));
+        Assert.assertFalse(publishmentType.hashCode() == publishmentType1.hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
new file mode 100644
index 0000000..ccc7717
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamColumnTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+
+
+public class StreamColumnTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void testStreamStringColumn() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("NAMEyhd").type(StreamColumn.Type.STRING).defaultValue("EAGLEyhd").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,NAMEyhd");
+        Assert.assertEquals("StreamColumn=name[NAMEyhd], type=[string], defaultValue=[EAGLEyhd], required=[true], nodataExpression=[PT1M,dynamic,1,NAMEyhd]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof String);
+    }
+
+    @Test
+    public void testStreamLongColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamLongColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.LONG).defaultValue("0").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[long], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Long);
+    }
+
+    @Test
+    public void testStreamDoubleColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamDoubleColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("-0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.DOUBLE).defaultValue("1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[double], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Double);
+    }
+
+    @Test
+    public void testStreamFloatColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamFloatColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("-0.1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[-0.1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.FLOAT).defaultValue("1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[float], defaultValue=[1.0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Float);
+    }
+
+    @Test
+    public void testStreamIntColumn() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamIntColumn1() {
+        thrown.expect(NumberFormatException.class);
+        new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0.1").required(true).build();
+    }
+
+
+    @Test
+    public void testStreamIntColumn2() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("1").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[1], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+
+        streamColumn = new StreamColumn.Builder().name("salary").type(StreamColumn.Type.INT).defaultValue("0").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,salary");
+        Assert.assertEquals("StreamColumn=name[salary], type=[int], defaultValue=[0], required=[true], nodataExpression=[PT1M,dynamic,1,salary]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof Integer);
+    }
+
+    @Test
+    public void testStreamBoolColumn() {
+        StreamColumn streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("eagle").required(false).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[false], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("1").required(true).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("0").required(true).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[false], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        streamBoolColumn = new StreamColumn.Builder().name("isYhd").type(StreamColumn.Type.BOOL).defaultValue("True").required(true).build();
+        streamBoolColumn.setNodataExpression("PT1M,dynamic,1,isYhd");
+        Assert.assertEquals("StreamColumn=name[isYhd], type=[bool], defaultValue=[true], required=[true], nodataExpression=[PT1M,dynamic,1,isYhd]", streamBoolColumn.toString());
+        Assert.assertTrue(streamBoolColumn.getDefaultValue() instanceof Boolean);
+    }
+
+    @Test
+    public void testStreamObjectColumn() {
+        thrown.expect(IllegalArgumentException.class);
+        new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("eagle").required(true).build();
+    }
+
+    @Test
+    public void testStreamObjectColumn1() {
+        StreamColumn streamColumn = new StreamColumn.Builder().name("name").type(StreamColumn.Type.OBJECT).defaultValue("{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}").required(true).build();
+        streamColumn.setNodataExpression("PT1M,dynamic,1,name");
+        Assert.assertEquals("StreamColumn=name[name], type=[object], defaultValue=[{name=heap.COMMITTED, Value=175636480}], required=[true], nodataExpression=[PT1M,dynamic,1,name]", streamColumn.toString());
+        Assert.assertTrue(streamColumn.getDefaultValue() instanceof HashMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2b61cef5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
new file mode 100644
index 0000000..b5015cd
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class StreamDefinitionTest {
+    @Test
+    public void testStreamDefinition() {
+
+        List<StreamColumn> streamColumns = new ArrayList<>();
+        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build());
+        streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
+
+        StreamDefinition streamDefinition = new StreamDefinition();
+        Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[]", streamDefinition.toString());
+        streamDefinition.setColumns(streamColumns);
+
+        Assert.assertEquals(3, streamDefinition.getColumnIndex("data"));
+        Assert.assertEquals(-1, streamDefinition.getColumnIndex("DATA"));
+        Assert.assertEquals(-1, streamDefinition.getColumnIndex("isYhd"));
+        Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition.toString());
+        StreamDefinition streamDefinition1 = streamDefinition.copy();
+        Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString());
+
+        Assert.assertFalse(streamDefinition1.equals(streamDefinition));
+        Assert.assertFalse(streamDefinition1 == streamDefinition);
+        Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode());
+    }
+}


Mime
View raw message