eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [06/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation
Date Thu, 08 Sep 2016 07:15:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
index cb8cda9..41dbe97 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
@@ -16,23 +16,20 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.UnitTopologyMain;
 import org.apache.eagle.alert.utils.KafkaEmbedded;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * @since May 10, 2016
- *
  */
 public class Integration3 {
 
@@ -56,12 +53,12 @@ public class Integration3 {
      * Assumption:
      * <p>
      * start metadata service 8080 /rest
-     * 
+     * <p>
      * <pre>
      * user@kafka-host:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syslog_events
      * </pre>
      * <p>
-     * 
+     *
      * @throws InterruptedException
      */
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
index 667d241..59af759 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
@@ -16,19 +16,17 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.engine.UnitTopologyMain;
 import org.apache.eagle.alert.utils.KafkaEmbedded;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Since 6/29/16.
@@ -51,8 +49,9 @@ public class Integration4NoDataAlert {
             kafka.shutdown();
         }
     }
+
     @Test
-    public void testTriggerNoData() throws Exception{
+    public void testTriggerNoData() throws Exception {
         System.setProperty("config.resource", "/nodata/application-nodata.conf");
         ConfigFactory.invalidateCaches();
         Config config = ConfigFactory.load();
@@ -71,9 +70,10 @@ public class Integration4NoDataAlert {
         });
 
         // wait 20 seconds for topology to bring up
-        try{
+        try {
             Thread.sleep(20000);
-        }catch(Exception ex){}
+        } catch (Exception ex) {
+        }
 
         // send mock data
         executors.submit(() -> {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
index 4d039d4..e656c31 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
@@ -49,8 +49,9 @@ public class Integration5AbsenceAlert {
             kafka.shutdown();
         }
     }
+
     @Test
-    public void testTriggerAbsenceAlert() throws Exception{
+    public void testTriggerAbsenceAlert() throws Exception {
         System.setProperty("config.resource", "/absence/application-absence.conf");
         ConfigFactory.invalidateCaches();
         Config config = ConfigFactory.load();
@@ -69,9 +70,10 @@ public class Integration5AbsenceAlert {
         });
 
         // wait 20 seconds for topology to bring up
-        try{
+        try {
             Thread.sleep(20000);
-        }catch(Exception ex){}
+        } catch (Exception ex) {
+        }
 
         // send mock data
         executors.submit(() -> {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
index c044da0..86de439 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.List;
-
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
@@ -30,12 +30,12 @@ import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.List;
 
 public class MetadataServiceClientImpTest {
 
-    @Test @Ignore
+    @Test
+    @Ignore
     public void test() {
         System.out.println("loading metadatas...");
         try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
index 6bff94b..acb8ff8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
@@ -16,9 +16,9 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.alert.utils.JsonUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -26,14 +26,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @since May 9, 2016
- *
  */
 public class SampleClient1 {
     @SuppressWarnings("unused")
@@ -42,7 +39,7 @@ public class SampleClient1 {
     private static final String PERFMON_CPU_STREAM = "perfmon_cpu_stream";
     private static final String PERFMON_MEM_STREAM = "perfmon_mem_stream";
 
-//    private static int hostIndx = 1;
+    //    private static int hostIndx = 1;
     private static String hostTemp = "host-000%d.datacenter.corp.com";
 
     /**
@@ -88,7 +85,7 @@ public class SampleClient1 {
         Pair<Long, String> pair = createEntity(base, stream, hostIndex);
         base = pair.getKey();
         ProducerRecord<String, String> record = new ProducerRecord<String, String>("perfmon_metrics",
-                pair.getRight());
+            pair.getRight());
         proceduer.send(record);
         return base;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
index ad0079c..8a2a639 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
@@ -16,25 +16,22 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.alert.utils.JsonUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * @since May 10, 2016
- *
  */
 public class SampleClient2 {
-    
+
 //    private static final Logger LOG = LoggerFactory.getLogger(SampleClient2.class);
 
     public static class LogEntity {
@@ -62,7 +59,7 @@ public class SampleClient2 {
         AtomicLong base1 = new AtomicLong(System.currentTimeMillis());
         AtomicLong base2 = new AtomicLong(System.currentTimeMillis());
         AtomicLong count = new AtomicLong();
-        
+
         Config config = ConfigFactory.load();
 
         try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer(config)) {
@@ -86,7 +83,7 @@ public class SampleClient2 {
     }
 
     private static void sendMetric(AtomicLong base1, AtomicLong base2, AtomicLong count,
-            KafkaProducer<String, String> proceduer, int i) {
+                                   KafkaProducer<String, String> proceduer, int i) {
         {
             Pair<Long, String> pair = createLogEntity(base1, i);
             ProducerRecord<String, String> logRecord = new ProducerRecord<>("eslogs", pair.getRight());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
index 3f16d1b..80fac3d 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
@@ -16,26 +16,23 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+import java.util.Properties;
 
 /**
  * @since Jun 12, 2016
- *
  */
 public class SampleClient3 {
-    
-    
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     public static void main(String[] args) throws Exception {
         System.setProperty("config.resource", "/e2e/application-e2e.conf");
         ConfigFactory.invalidateCaches();
-        
+
         Config config = ConfigFactory.load();
         KafkaProducer producer = createByteProceduer(config);
 
@@ -52,7 +49,7 @@ public class SampleClient3 {
 //        }
     }
 
-//    private static SherlockEvent createEvent(TimeSeriesDataSchemaManager manager) throws Exception {
+    //    private static SherlockEvent createEvent(TimeSeriesDataSchemaManager manager) throws Exception {
 //
 //        SherlockEventBuilder builder = SherlockEvent.newBuilder();
 //        builder.setEpochMillis(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
index f0e0d80..2123571 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
@@ -16,30 +16,29 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.utils.Utils;
-
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
 
 /**
  * Since 6/29/16.
  */
-@SuppressWarnings({ "rawtypes", "unchecked"})
+@SuppressWarnings( {"rawtypes", "unchecked"})
 public class SampleClient4NoDataAlert {
     private static final Logger LOG = LoggerFactory.getLogger(SampleClient4NoDataAlert.class);
     private static long currentTimestamp = 1467240000000L;
     private static long interval = 3000L;
     private static volatile boolean host1Muted = false;
     private static volatile boolean host2Muted = false;
+
     public static void main(String[] args) throws Exception {
         System.setProperty("config.resource", "/nodata/application-nodata.conf");
         ConfigFactory.invalidateCaches();
@@ -49,12 +48,12 @@ public class SampleClient4NoDataAlert {
         ProducerRecord record = null;
         Thread x = new MuteThread();
         x.start();
-        while(true) {
-            if(!host1Muted) {
+        while (true) {
+            if (!host1Muted) {
                 record = new ProducerRecord("noDataAlertTopic", createEvent("host1"));
                 producer.send(record);
             }
-            if(!host2Muted) {
+            if (!host2Muted) {
                 record = new ProducerRecord("noDataAlertTopic", createEvent("host2"));
                 producer.send(record);
             }
@@ -65,9 +64,9 @@ public class SampleClient4NoDataAlert {
         }
     }
 
-    private static class MuteThread extends Thread{
+    private static class MuteThread extends Thread {
         @Override
-        public void run(){
+        public void run() {
             try {
                 // sleep 10 seconds
                 Thread.sleep(10000);
@@ -89,13 +88,13 @@ public class SampleClient4NoDataAlert {
                 Thread.sleep(70000);
                 LOG.info("unmute host2");
                 host2Muted = false;
-            }catch(Exception ex){
+            } catch (Exception ex) {
                 ex.printStackTrace();
             }
         }
     }
 
-    private static class NoDataEvent{
+    private static class NoDataEvent {
         @JsonProperty
         long timestamp;
         @JsonProperty
@@ -103,20 +102,20 @@ public class SampleClient4NoDataAlert {
         @JsonProperty
         double value;
 
-        public String toString(){
+        public String toString() {
             return "timestamp=" + timestamp + ",host=" + host + ",value=" + value;
         }
     }
 
-    private static String createEvent(String host) throws Exception{
+    private static String createEvent(String host) throws Exception {
         NoDataEvent e = new NoDataEvent();
         long expectTS = currentTimestamp + interval;
         // adjust back 1 second random
-        long adjust = Math.round(2*Math.random());
-        e.timestamp = expectTS-adjust;
+        long adjust = Math.round(2 * Math.random());
+        e.timestamp = expectTS - adjust;
         e.host = host;
         e.value = 25.6;
-        LOG.info("sending event {} ",  e);
+        LOG.info("sending event {} ", e);
         ObjectMapper mapper = new ObjectMapper();
         String value = mapper.writeValueAsString(e);
         return value;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
index 0256324..533e486 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient5AbsenceAlert.java
@@ -16,7 +16,6 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import backtype.storm.utils.Utils;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
@@ -35,6 +34,7 @@ public class SampleClient5AbsenceAlert {
     private static final Logger LOG = LoggerFactory.getLogger(SampleClient5AbsenceAlert.class);
     private static long currentTimestamp = 1467240000000L;
     private static long interval = 3000L;
+
     public static void main(String[] args) throws Exception {
         System.setProperty("config.resource", "/absence/application-absence.conf");
         ConfigFactory.invalidateCaches();
@@ -50,7 +50,7 @@ public class SampleClient5AbsenceAlert {
         producer.send(record);
     }
 
-    private static class AbsenceEvent{
+    private static class AbsenceEvent {
         @JsonProperty
         long timestamp;
         @JsonProperty
@@ -58,20 +58,20 @@ public class SampleClient5AbsenceAlert {
         @JsonProperty
         String status;
 
-        public String toString(){
+        public String toString() {
             return "timestamp=" + timestamp + ",jobID=" + jobID + ",status=" + status;
         }
     }
 
-    private static String createEvent(String jobID) throws Exception{
+    private static String createEvent(String jobID) throws Exception {
         AbsenceEvent e = new AbsenceEvent();
         long expectTS = currentTimestamp + interval;
         // adjust back 1 second random
-        long adjust = Math.round(2*Math.random());
-        e.timestamp = expectTS-adjust;
+        long adjust = Math.round(2 * Math.random());
+        e.timestamp = expectTS - adjust;
         e.jobID = jobID;
         e.status = "running";
-        LOG.info("sending event {} ",  e);
+        LOG.info("sending event {} ", e);
         ObjectMapper mapper = new ObjectMapper();
         String value = mapper.writeValueAsString(e);
         return value;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
index 7bf4931..f356931 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java
@@ -16,52 +16,52 @@
  */
 package org.apache.eagle.alert.engine.evaluator;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
 import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 public class AlertBoltOutputCollectorThreadSafeWrapperTest {
     @Test
-    public void testThreadSafeAlertBoltOutputCollector(){
+    public void testThreadSafeAlertBoltOutputCollector() {
         MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null);
         AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(stormOutputCollector);
         alertBoltOutputCollectorWrapper.emit(create("mockAlert_1"));
         alertBoltOutputCollectorWrapper.emit(create("mockAlert_2"));
-        Assert.assertEquals(0,stormOutputCollector.getCollected().size());
-        Assert.assertEquals(0,stormOutputCollector.getTupleSize());
+        Assert.assertEquals(0, stormOutputCollector.getCollected().size());
+        Assert.assertEquals(0, stormOutputCollector.getTupleSize());
         alertBoltOutputCollectorWrapper.flush();
-        Assert.assertEquals(2,stormOutputCollector.getCollected().size());
-        Assert.assertEquals(2,stormOutputCollector.getTupleSize());
+        Assert.assertEquals(2, stormOutputCollector.getCollected().size());
+        Assert.assertEquals(2, stormOutputCollector.getTupleSize());
         alertBoltOutputCollectorWrapper.emit(create("mockAlert_3"));
-        Assert.assertEquals(2,stormOutputCollector.getCollected().size());
-        Assert.assertEquals(2,stormOutputCollector.getTupleSize());
+        Assert.assertEquals(2, stormOutputCollector.getCollected().size());
+        Assert.assertEquals(2, stormOutputCollector.getTupleSize());
         alertBoltOutputCollectorWrapper.flush();
         alertBoltOutputCollectorWrapper.flush();
         alertBoltOutputCollectorWrapper.flush();
-        Assert.assertEquals(3,stormOutputCollector.getCollected().size());
-        Assert.assertEquals(3,stormOutputCollector.getTupleSize());
+        Assert.assertEquals(3, stormOutputCollector.getCollected().size());
+        Assert.assertEquals(3, stormOutputCollector.getTupleSize());
     }
 
-    private AlertStreamEvent create(String streamId){
+    private AlertStreamEvent create(String streamId) {
         AlertStreamEvent alert = new AlertStreamEvent();
         alert.setCreatedBy(this.toString());
         alert.setCreatedTime(System.currentTimeMillis());
-        alert.setData(new Object[]{"field_1",2,"field_3"});
+        alert.setData(new Object[] {"field_1", 2, "field_3"});
         alert.setStreamId(streamId);
         return alert;
     }
 
     private class MockedStormAlertOutputCollector extends OutputCollector {
-        private final Map<Object,List<Object>> collected;
+        private final Map<Object, List<Object>> collected;
+
         MockedStormAlertOutputCollector(IOutputCollector delegate) {
             super(delegate);
             collected = new HashMap<>();
@@ -69,19 +69,20 @@ public class AlertBoltOutputCollectorThreadSafeWrapperTest {
 
         @Override
         public List<Integer> emit(String streamId, List<Object> tuple) {
-            if(!collected.containsKey(tuple.get(0))){
-                collected.put(tuple.get(0),new LinkedList<>());
+            if (!collected.containsKey(tuple.get(0))) {
+                collected.put(tuple.get(0), new LinkedList<>());
             }
             collected.get(tuple.get(0)).add(tuple);
             return null;
         }
-        Map<Object,List<Object>> getCollected(){
+
+        Map<Object, List<Object>> getCollected() {
             return collected;
         }
 
-        int getTupleSize(){
+        int getTupleSize() {
             int size = 0;
-            for(List<Object> alerts:collected.values()){
+            for (List<Object> alerts : collected.values()) {
                 size += alerts.size();
             }
             return size;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
index 3f653d4..b9e3385 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/PoilcyExtendedTest.java
@@ -36,7 +36,7 @@ public class PoilcyExtendedTest {
     @Test
     public void test() throws Exception {
         ArrayNode arrayNode = (ArrayNode)
-                mapper.readTree(PoilcyExtendedTest.class.getResourceAsStream("/extend_policy.json"));
+            mapper.readTree(PoilcyExtendedTest.class.getResourceAsStream("/extend_policy.json"));
         Assert.assertEquals(1, arrayNode.size());
         for (JsonNode node : arrayNode) {
             PolicyDefinition definition = mapper.readValue(node, PolicyDefinition.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
index 140b72d..476d71f 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java
@@ -16,15 +16,7 @@
  */
 package org.apache.eagle.alert.engine.evaluator;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
+import backtype.storm.metric.api.MultiCountMetric;
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -39,14 +31,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.metric.api.MultiCountMetric;
+import java.util.*;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 public class SiddhiCEPPolicyEventHandlerTest {
     private final static Logger LOG = LoggerFactory.getLogger(SiddhiCEPPolicyEventHandlerTest.class);
 
-    private Map<String, StreamDefinition> createDefinition(String ... streamIds) {
+    private Map<String, StreamDefinition> createDefinition(String... streamIds) {
         Map<String, StreamDefinition> sds = new HashMap<>();
-        for(String streamId:streamIds) {
+        for (String streamId : streamIds) {
             // construct StreamDefinition
             StreamDefinition sd = MockSampleMetadataFactory.createSampleStreamDefinition(streamId);
             sds.put(streamId, sd);
@@ -60,23 +54,23 @@ public class SiddhiCEPPolicyEventHandlerTest {
         SiddhiPolicyHandler handler;
         MockStreamCollector collector;
 
-        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1","sampleStream_2"), 0);
+        handler = new SiddhiPolicyHandler(createDefinition("sampleStream_1", "sampleStream_2"), 0);
         collector = new MockStreamCollector();
         PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy();
         PolicyHandlerContext context = new PolicyHandlerContext();
         context.setPolicyDefinition(policyDefinition);
         context.setPolicyCounter(new MultiCountMetric());
         context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
-        handler.prepare(collector,context);
-        StreamEvent event = StreamEvent.Builder()
-                .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1"))
-                .streamId("sampleStream_1")
-                .timestamep(System.currentTimeMillis())
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","cpu");
-                    put("value",60.0);
-                    put("bad","bad column value");
-                }}).build();
+        handler.prepare(collector, context);
+        StreamEvent event = StreamEvent.builder()
+            .schema(MockSampleMetadataFactory.createSampleStreamDefinition("sampleStream_1"))
+            .streamId("sampleStream_1")
+            .timestamep(System.currentTimeMillis())
+            .attributes(new HashMap<String, Object>() {{
+                put("name", "cpu");
+                put("value", 60.0);
+                put("bad", "bad column value");
+            }}).build();
         handler.send(event);
         handler.close();
     }
@@ -84,24 +78,24 @@ public class SiddhiCEPPolicyEventHandlerTest {
     @SuppressWarnings("serial")
     @Test
     public void testWithTwoStreamJoinPolicy() throws Exception {
-        Map<String,StreamDefinition> ssd = createDefinition("sampleStream_1","sampleStream_2");
+        Map<String, StreamDefinition> ssd = createDefinition("sampleStream_1", "sampleStream_2");
 
         PolicyDefinition policyDefinition = new PolicyDefinition();
         policyDefinition.setName("SampleJoinPolicyForTest");
-        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1","sampleStream_2"));
+        policyDefinition.setInputStreams(Arrays.asList("sampleStream_1", "sampleStream_2"));
         policyDefinition.setOutputStreams(Collections.singletonList("joinedStream"));
         policyDefinition.setDefinition(new PolicyDefinition.Definition(PolicyStreamHandlers.SIDDHI_ENGINE,
-                "from sampleStream_1#window.length(10) as left " +
+            "from sampleStream_1#window.length(10) as left " +
                 "join sampleStream_2#window.length(10) as right " +
                 "on left.name == right.name and left.value == right.value " +
-                "select left.timestamp,left.name,left.value "+
+                "select left.timestamp,left.name,left.value " +
                 "insert into joinedStream"));
         policyDefinition.setPartitionSpec(Collections.singletonList(MockSampleMetadataFactory.createSampleStreamGroupbyPartition("sampleStream_1", Collections.singletonList("name"))));
         SiddhiPolicyHandler handler;
         Semaphore mutex = new Semaphore(0);
         List<AlertStreamEvent> alerts = new ArrayList<>(0);
         Collector<AlertStreamEvent> collector = (event) -> {
-            LOG.info("Collected {}",event);
+            LOG.info("Collected {}", event);
             Assert.assertTrue(event != null);
             alerts.add(event);
             mutex.release();
@@ -112,54 +106,54 @@ public class SiddhiCEPPolicyEventHandlerTest {
         context.setPolicyDefinition(policyDefinition);
         context.setPolicyCounter(new MultiCountMetric());
         context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId"));
-        handler.prepare(collector,context);
+        handler.prepare(collector, context);
 
 
         long ts_1 = System.currentTimeMillis();
-        long ts_2 = System.currentTimeMillis()+1;
-
-        handler.send(StreamEvent.Builder()
-                .schema(ssd.get("sampleStream_1"))
-                .streamId("sampleStream_1")
-                .timestamep(ts_1)
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","cpu");
-                    put("value",60.0);
-                    put("bad","bad column value");
-                }}).build());
-
-        handler.send(StreamEvent.Builder()
-                .schema(ssd.get("sampleStream_2"))
-                .streamId("sampleStream_2")
-                .timestamep(ts_2)
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","cpu");
-                    put("value",61.0);
-                }}).build());
-
-        handler.send(StreamEvent.Builder()
-                .schema(ssd.get("sampleStream_2"))
-                .streamId("sampleStream_2")
-                .timestamep(ts_2)
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","disk");
-                    put("value",60.0);
-                }}).build());
-
-        handler.send(StreamEvent.Builder()
-                .schema(ssd.get("sampleStream_2"))
-                .streamId("sampleStream_2")
-                .timestamep(ts_2)
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","cpu");
-                    put("value",60.0);
-                }}).build());
+        long ts_2 = System.currentTimeMillis() + 1;
+
+        handler.send(StreamEvent.builder()
+            .schema(ssd.get("sampleStream_1"))
+            .streamId("sampleStream_1")
+            .timestamep(ts_1)
+            .attributes(new HashMap<String, Object>() {{
+                put("name", "cpu");
+                put("value", 60.0);
+                put("bad", "bad column value");
+            }}).build());
+
+        handler.send(StreamEvent.builder()
+            .schema(ssd.get("sampleStream_2"))
+            .streamId("sampleStream_2")
+            .timestamep(ts_2)
+            .attributes(new HashMap<String, Object>() {{
+                put("name", "cpu");
+                put("value", 61.0);
+            }}).build());
+
+        handler.send(StreamEvent.builder()
+            .schema(ssd.get("sampleStream_2"))
+            .streamId("sampleStream_2")
+            .timestamep(ts_2)
+            .attributes(new HashMap<String, Object>() {{
+                put("name", "disk");
+                put("value", 60.0);
+            }}).build());
+
+        handler.send(StreamEvent.builder()
+            .schema(ssd.get("sampleStream_2"))
+            .streamId("sampleStream_2")
+            .timestamep(ts_2)
+            .attributes(new HashMap<String, Object>() {{
+                put("name", "cpu");
+                put("value", 60.0);
+            }}).build());
 
         handler.close();
 
-        Assert.assertTrue("Should get result in 5 s",mutex.tryAcquire(5, TimeUnit.SECONDS));
-        Assert.assertEquals(1,alerts.size());
-        Assert.assertEquals("joinedStream",alerts.get(0).getStreamId());
-        Assert.assertEquals("cpu",alerts.get(0).getData()[1]);
+        Assert.assertTrue("Should get result in 5 s", mutex.tryAcquire(5, TimeUnit.SECONDS));
+        Assert.assertEquals(1, alerts.size());
+        Assert.assertEquals("joinedStream", alerts.get(0).getStreamId());
+        Assert.assertEquals("cpu", alerts.get(0).getData()[1]);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
index d127116..4efcc28 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/integration/MockMetadataServiceClient.java
@@ -16,9 +16,6 @@
  */
 package org.apache.eagle.alert.engine.integration;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
@@ -29,11 +26,14 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 
+import java.io.IOException;
+import java.util.List;
+
 @SuppressWarnings("serial")
 public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
-    public List<SpoutSpec>  listSpoutMetadata() {
+    public List<SpoutSpec> listSpoutMetadata() {
         return null;
     }
 
@@ -79,7 +79,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addScheduleState(ScheduleState state) {
-        
+
     }
 
     @Override
@@ -89,7 +89,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addStreamingCluster(StreamingCluster cluster) {
-        
+
     }
 
     @Override
@@ -99,7 +99,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addTopology(Topology t) {
-        
+
     }
 
     @Override
@@ -109,7 +109,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addPolicy(PolicyDefinition policy) {
-        
+
     }
 
     @Override
@@ -119,7 +119,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addStreamDefinition(StreamDefinition streamDef) {
-        
+
     }
 
     @Override
@@ -129,7 +129,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addDataSource(Kafka2TupleMetadata k2t) {
-        
+
     }
 
     @Override
@@ -139,7 +139,7 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void addPublishment(Publishment pub) {
-        
+
     }
 
     @Override
@@ -149,6 +149,6 @@ public class MockMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public void clear() {
-        
+
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
index 76aef7e..a1a9cdd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/metric/MemoryUsageGaugeSetTest.java
@@ -1,15 +1,14 @@
 package org.apache.eagle.alert.engine.metric;
 
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.codahale.metrics.ConsoleReporter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -36,9 +35,9 @@ public class MemoryUsageGaugeSetTest {
         LOG.info("Starting testJVMMetrics");
         final MetricRegistry metrics = new MetricRegistry();
         ConsoleReporter reporter = ConsoleReporter.forRegistry(metrics)
-                .convertRatesTo(TimeUnit.SECONDS)
-                .convertDurationsTo(TimeUnit.MILLISECONDS)
-                .build();
+            .convertRatesTo(TimeUnit.SECONDS)
+            .convertDurationsTo(TimeUnit.MILLISECONDS)
+            .build();
         metrics.registerAll(new MemoryUsageGaugeSet());
         metrics.register("sample", (Gauge<Double>) () -> 0.1234);
         reporter.start(1, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
index 74d11d2..09f01e4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockPartitionedCollector.java
@@ -1,13 +1,13 @@
 package org.apache.eagle.alert.engine.mock;
 
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -28,7 +28,8 @@ public class MockPartitionedCollector implements PartitionedEventCollector {
     @SuppressWarnings("unused")
     private final static Logger LOG = LoggerFactory.getLogger(MockPartitionedCollector.class);
     private List<PartitionedEvent> cache;
-    public MockPartitionedCollector(){
+
+    public MockPartitionedCollector() {
         cache = new LinkedList<>();
     }
 
@@ -36,15 +37,15 @@ public class MockPartitionedCollector implements PartitionedEventCollector {
         cache.add(event);
     }
 
-    public void clear(){
+    public void clear() {
         cache.clear();
     }
 
-    public List<PartitionedEvent> get(){
+    public List<PartitionedEvent> get() {
         return cache;
     }
 
-    public int size(){
+    public int size() {
         return cache.size();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
index 22c13a2..9c9f1eb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java
@@ -16,46 +16,40 @@
  */
 package org.apache.eagle.alert.engine.mock;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
 import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
 import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.coordinator.*;
 import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+
 @SuppressWarnings("serial")
 public class MockSampleMetadataFactory {
     private static MockStreamMetadataService mockStreamMetadataServiceInstance = null;
-    public static MockStreamMetadataService createSingletonMetadataServiceWithSample(){
-        if(mockStreamMetadataServiceInstance!=null) return mockStreamMetadataServiceInstance;
+
+    public static MockStreamMetadataService createSingletonMetadataServiceWithSample() {
+        if (mockStreamMetadataServiceInstance != null) {
+            return mockStreamMetadataServiceInstance;
+        }
         mockStreamMetadataServiceInstance = new MockStreamMetadataService();
-        mockStreamMetadataServiceInstance.registerStream("sampleStream",createSampleStreamDefinition("sampleStream"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_1",createSampleStreamDefinition("sampleStream_1"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_2",createSampleStreamDefinition("sampleStream_2"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_3",createSampleStreamDefinition("sampleStream_3"));
-        mockStreamMetadataServiceInstance.registerStream("sampleStream_4",createSampleStreamDefinition("sampleStream_4"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream", createSampleStreamDefinition("sampleStream"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_1", createSampleStreamDefinition("sampleStream_1"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_2", createSampleStreamDefinition("sampleStream_2"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_3", createSampleStreamDefinition("sampleStream_3"));
+        mockStreamMetadataServiceInstance.registerStream("sampleStream_4", createSampleStreamDefinition("sampleStream_4"));
         return mockStreamMetadataServiceInstance;
     }
 
-    public static StreamDefinition createSampleStreamDefinition(String streamId){
+    public static StreamDefinition createSampleStreamDefinition(String streamId) {
         StreamDefinition sampleStreamDefinition = new StreamDefinition();
         sampleStreamDefinition.setStreamId(streamId);
         sampleStreamDefinition.setTimeseries(true);
         sampleStreamDefinition.setValidate(true);
-        sampleStreamDefinition.setDescription("Schema for "+streamId);
+        sampleStreamDefinition.setDescription("Schema for " + streamId);
         List<StreamColumn> streamColumns = new ArrayList<>();
 
         streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
@@ -73,7 +67,7 @@ public class MockSampleMetadataFactory {
      * @param streamId
      * @return
      */
-    public static StreamSortSpec createSampleStreamSortSpec(String streamId){
+    public static StreamSortSpec createSampleStreamSortSpec(String streamId) {
         StreamSortSpec streamSortSpec = new StreamSortSpec();
 //        streamSortSpec.setColumn("timestamp");
 //        streamSortSpec.setStreamId(streamId);
@@ -82,7 +76,7 @@ public class MockSampleMetadataFactory {
         return streamSortSpec;
     }
 
-    public static StreamSortSpec createSampleStreamSortSpec(String streamId,String period,int margin){
+    public static StreamSortSpec createSampleStreamSortSpec(String streamId, String period, int margin) {
         StreamSortSpec streamSortSpec = new StreamSortSpec();
 //        streamSortSpec.setColumn("timestamp");
 //        streamSortSpec.setStreamId(streamId);
@@ -96,21 +90,21 @@ public class MockSampleMetadataFactory {
      *
      * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;]
      */
-    public static PolicyDefinition createSingleMetricSamplePolicy(){
+    public static PolicyDefinition createSingleMetricSamplePolicy() {
         String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;";
         PolicyDefinition policyDefinition = new PolicyDefinition();
         policyDefinition.setName("SamplePolicyForTest");
         policyDefinition.setInputStreams(Arrays.asList("sampleStream_1"));
         policyDefinition.setOutputStreams(Arrays.asList("outputStream"));
         policyDefinition.setDefinition(new PolicyDefinition.Definition(
-                PolicyStreamHandlers.SIDDHI_ENGINE,
-                definePolicy
+            PolicyStreamHandlers.SIDDHI_ENGINE,
+            definePolicy
         ));
-        policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1",Arrays.asList("name"))));
+        policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name"))));
         return policyDefinition;
     }
 
-    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField){
+    public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) {
         StreamPartition streamPartition = new StreamPartition();
         streamPartition.setStreamId(streamId);
         streamPartition.setColumns(new ArrayList<>(groupByField));
@@ -122,19 +116,19 @@ public class MockSampleMetadataFactory {
         return streamPartition;
     }
 
-    public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds){
+    public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds) {
         List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> {
             return new WorkSlot("sampleTopology", t);
         }).toArray(WorkSlot[]::new));
         StreamRouterSpec streamRouteSpec = new StreamRouterSpec();
         streamRouteSpec.setStreamId(streamId);
-        streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId,Arrays.asList(groupByField)));
+        streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId, Arrays.asList(groupByField)));
         streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots)));
         return streamRouteSpec;
     }
 
-    public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds){
-        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
+    public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds) {
+        return createSampleStreamRouteSpec("sampleStream_1", "name", targetEvaluatorIds);
     }
 
     /**
@@ -143,25 +137,25 @@ public class MockSampleMetadataFactory {
      * @param targetEvaluatorIds
      * @return
      */
-    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds){
-        return createSampleStreamRouteSpec("sampleStream_1","name",targetEvaluatorIds);
+    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds) {
+        return createSampleStreamRouteSpec("sampleStream_1", "name", targetEvaluatorIds);
     }
 
-    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds){
-        return createSampleStreamRouteSpec("sampleStream_2","name",targetEvaluatorIds);
+    public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds) {
+        return createSampleStreamRouteSpec("sampleStream_2", "name", targetEvaluatorIds);
     }
 
-    public static PartitionedEvent createSimpleStreamEvent()  {
+    public static PartitionedEvent createSimpleStreamEvent() {
         StreamEvent event = null;
         try {
-            event = StreamEvent.Builder()
+            event = StreamEvent.builder()
                 .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1"))
                 .streamId("sampleStream_1")
                 .timestamep(System.currentTimeMillis())
-                .attributes(new HashMap<String,Object>(){{
-                    put("name","cpu");
-                    put("value",60.0);
-                    put("unknown","unknown column value");
+                .attributes(new HashMap<String, Object>() {{
+                    put("name", "cpu");
+                    put("value", 60.0);
+                    put("unknown", "unknown column value");
                 }}).build();
         } catch (StreamDefinitionNotFoundException e) {
             e.printStackTrace();
@@ -171,91 +165,91 @@ public class MockSampleMetadataFactory {
         return pEvent;
     }
 
-    private final static String[] SAMPLE_STREAM_NAME_OPTIONS=new String[]{
-            "cpu","memory","disk","network"
+    private final static String[] SAMPLE_STREAM_NAME_OPTIONS = new String[] {
+        "cpu", "memory", "disk", "network"
     };
 
-    private final static String[] SAMPLE_STREAM_HOST_OPTIONS =new String[]{
-            "localhost_1","localhost_2","localhost_3","localhost_4"
+    private final static String[] SAMPLE_STREAM_HOST_OPTIONS = new String[] {
+        "localhost_1", "localhost_2", "localhost_3", "localhost_4"
     };
 
-    private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS=new Boolean[]{
-            true,false
+    private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS = new Boolean[] {
+        true, false
     };
 
-    private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS=new Double[]{
-            -0.20, 40.4,50.5,60.6,10000.1
+    private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS = new Double[] {
+        -0.20, 40.4, 50.5, 60.6, 10000.1
     };
-    private final static String[] SAMPLE_STREAM_ID_OPTIONS=new String[]{
-            "sampleStream_1","sampleStream_2","sampleStream_3","sampleStream_4",
+    private final static String[] SAMPLE_STREAM_ID_OPTIONS = new String[] {
+        "sampleStream_1", "sampleStream_2", "sampleStream_3", "sampleStream_4",
     };
     private final static Random RANDOM = ThreadLocalRandom.current();
 
-    public static StreamEvent createRandomStreamEvent()  {
+    public static StreamEvent createRandomStreamEvent() {
         return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]);
     }
 
-    public static StreamEvent createRandomStreamEvent(String streamId)  {
-        return createRandomStreamEvent(streamId,System.currentTimeMillis());
+    public static StreamEvent createRandomStreamEvent(String streamId) {
+        return createRandomStreamEvent(streamId, System.currentTimeMillis());
     }
 
-    private final static Long[] TIME_DELTA_OPTIONS = new Long[]{
-            -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
+    private final static Long[] TIME_DELTA_OPTIONS = new Long[] {
+        -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L
     };
 
-    public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId)  {
+    public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId) {
         StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
+        event.setTimestamp(System.currentTimeMillis() + TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
         return event;
     }
 
 
-    public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId)  {
+    public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId) {
         StreamEvent event = createRandomStreamEvent(streamId);
-        event.setTimestamp(System.currentTimeMillis()+TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
-        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+        event.setTimestamp(System.currentTimeMillis() + TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]);
+        return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
     }
 
-    public static PartitionedEvent createPartitionedEventGroupedByName(String streamId,long timestamp)  {
+    public static PartitionedEvent createPartitionedEventGroupedByName(String streamId, long timestamp) {
         StreamEvent event = createRandomStreamEvent(streamId);
         event.setTimestamp(timestamp);
-        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+        return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
     }
 
-    public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId)  {
+    public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId) {
         StreamEvent event = createRandomStreamEvent(streamId);
         event.setTimestamp(System.currentTimeMillis());
-        return new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+        return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
     }
 
-    public static StreamEvent createRandomStreamEvent(String streamId, long timestamp)  {
+    public static StreamEvent createRandomStreamEvent(String streamId, long timestamp) {
         StreamEvent event;
         try {
-            event = StreamEvent.Builder()
-                    .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
-                    .streamId(streamId)
-                    .timestamep(timestamp)
-                    .attributes(new HashMap<String,Object>(){{
-                        put("name",SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
-                        put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-                        put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
-                        put("flag",SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
+            event = StreamEvent.builder()
+                .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId))
+                .streamId(streamId)
+                .timestamep(timestamp)
+                .attributes(new HashMap<String, Object>() {{
+                    put("name", SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]);
+                    put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
+                    put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]);
+                    put("flag", SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]);
 //                        put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
 //                        put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
 //                        put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
 //                        put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
 //                        put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]);
-                        put("unknown","unknown column value");
-                    }}).build();
+                    put("unknown", "unknown column value");
+                }}).build();
         } catch (StreamDefinitionNotFoundException e) {
-            throw new IllegalStateException(e.getMessage(),e);
+            throw new IllegalStateException(e.getMessage(), e);
         }
         return event;
     }
 
-    public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp)  {
-        StreamEvent event = createRandomStreamEvent(streamId,timestamp);
-        PartitionedEvent partitionedEvent = new PartitionedEvent(event,createSampleStreamGroupbyPartition(streamId,Arrays.asList("name")),event.getData()[0].hashCode());
+    public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp) {
+        StreamEvent event = createRandomStreamEvent(streamId, timestamp);
+        PartitionedEvent partitionedEvent = new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode());
         return partitionedEvent;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
index fa07701..b865422 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java
@@ -16,19 +16,20 @@
  */
 package org.apache.eagle.alert.engine.mock;
 
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
+import java.util.List;
+
 public class MockStreamCollector implements Collector<AlertStreamEvent> {
     @SuppressWarnings("unused")
     private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class);
     private List<AlertStreamEvent> cache;
-    public MockStreamCollector(){
+
+    public MockStreamCollector() {
         cache = new LinkedList<>();
     }
 
@@ -37,15 +38,15 @@ public class MockStreamCollector implements Collector<AlertStreamEvent> {
         // LOG.info("PartitionedEventCollector received: {}",event);
     }
 
-    public void clear(){
+    public void clear() {
         cache.clear();
     }
 
-    public List<AlertStreamEvent> get(){
+    public List<AlertStreamEvent> get() {
         return cache;
     }
 
-    public int size(){
+    public int size() {
         return cache.size();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
index 2119ecd..86fb426 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java
@@ -16,24 +16,24 @@
  */
 package org.apache.eagle.alert.engine.mock;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
 
-public class MockStreamMetadataService{
-    private final Map<String,StreamDefinition> streamSchemaMap = new HashMap<>();
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockStreamMetadataService {
+    private final Map<String, StreamDefinition> streamSchemaMap = new HashMap<>();
 
     public StreamDefinition getStreamDefinition(String streamId) throws StreamDefinitionNotFoundException {
-        if(streamSchemaMap.containsKey(streamId)) {
+        if (streamSchemaMap.containsKey(streamId)) {
             return streamSchemaMap.get(streamId);
-        }else {
+        } else {
             throw new StreamDefinitionNotFoundException(streamId);
         }
     }
 
-    public void registerStream(String streamId, StreamDefinition schema){
-        streamSchemaMap.put(streamId,schema);
+    public void registerStream(String streamId, StreamDefinition schema) {
+        streamSchemaMap.put(streamId, schema);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
index 1e44c2a..9ab4c24 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java
@@ -1,22 +1,21 @@
 package org.apache.eagle.alert.engine.mock;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -39,9 +38,10 @@ public class MockStreamReceiver extends BaseRichSpout {
     private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class);
     private SpoutOutputCollector collector;
     private List<String> outputStreamIds;
-    public MockStreamReceiver(int partition){
+
+    public MockStreamReceiver(int partition) {
         outputStreamIds = new ArrayList<>(partition);
-        for(int i=0;i<partition;i++){
+        for (int i = 0; i < partition; i++) {
             outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i));
         }
     }
@@ -53,7 +53,8 @@ public class MockStreamReceiver extends BaseRichSpout {
     }
 
     @Override
-    public void close() {}
+    public void close() {
+    }
 
     /**
      * This unit test is not to mock the end2end logic of correlation spout,
@@ -62,18 +63,18 @@ public class MockStreamReceiver extends BaseRichSpout {
     @Override
     public void nextTuple() {
         PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1");
-        LOG.info("Receive {}",event);
+        LOG.info("Receive {}", event);
         collector.emit(outputStreamIds.get(
-                // group by the first field in event i.e. name
-                (int) (event.getPartitionKey() % outputStreamIds.size())),
-                Collections.singletonList(event));
+            // group by the first field in event i.e. name
+            (int) (event.getPartitionKey() % outputStreamIds.size())),
+            Collections.singletonList(event));
         Utils.sleep(500);
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for(String streamId:outputStreamIds) {
-            declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0));
+        for (String streamId : outputStreamIds) {
+            declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
index 72ef02b..5142b76 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java
@@ -16,11 +16,6 @@
  */
 package org.apache.eagle.alert.engine.nodata;
 
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-
 import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeBatchWindow;
 import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
 import org.apache.eagle.alert.engine.model.StreamEvent;
@@ -28,62 +23,65 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
+
 public class TestDistinctValuesInTimeBatchWindow {
 
-	private static final String inputStream = "testInputStream";
+    private static final String inputStream = "testInputStream";
+
+    private NoDataPolicyTimeBatchHandler handler;
 
-	private NoDataPolicyTimeBatchHandler handler;
+    @Before
+    public void setup() {
+        handler = mock(NoDataPolicyTimeBatchHandler.class);
+    }
 
-	@Before
-	public void setup() {
-		handler = mock(NoDataPolicyTimeBatchHandler.class);
-	}
+    @After
+    public void teardown() {
+    }
 
-	@After
-	public void teardown() {
-	}
+    @Test
+    public void testNormal() throws Exception {
+        // wisb is null since it is dynamic mode
+        DistinctValuesInTimeBatchWindow window = new DistinctValuesInTimeBatchWindow(handler, 5 * 1000, null);
 
-	@Test
-	public void testNormal() throws Exception {
-		// wisb is null since it is dynamic mode
-		DistinctValuesInTimeBatchWindow window = new DistinctValuesInTimeBatchWindow(handler, 5 * 1000, null);
+        long now = System.currentTimeMillis();
 
-		long now = System.currentTimeMillis();
+        // handler.compareAndEmit(anyObject(), anyObject(), anyObject());
 
-		// handler.compareAndEmit(anyObject(), anyObject(), anyObject());
+        // event time
+        sendEventToWindow(window, now, "host1", 95.5);
 
-		// event time
-		sendEventToWindow(window, now, "host1", 95.5);
+        Thread.sleep(6000);
 
-		Thread.sleep(6000);
+        sendEventToWindow(window, now, "host1", 91.0);
+        sendEventToWindow(window, now, "host2", 95.5);
+        sendEventToWindow(window, now, "host2", 97.1);
 
-		sendEventToWindow(window, now, "host1", 91.0);
-		sendEventToWindow(window, now, "host2", 95.5);
-		sendEventToWindow(window, now, "host2", 97.1);
+        Thread.sleep(3000);
 
-		Thread.sleep(3000);
+        sendEventToWindow(window, now, "host1", 90.7);
 
-		sendEventToWindow(window, now, "host1", 90.7);
+        Thread.sleep(4000);
 
-		Thread.sleep(4000);
+        sendEventToWindow(window, now, "host1", 90.7);
 
-		sendEventToWindow(window, now, "host1", 90.7);
-		
-		Thread.sleep(3000);
+        Thread.sleep(3000);
 
-		verify(handler, times(3)).compareAndEmit(anyObject(), anyObject(), anyObject());
-	}
+        verify(handler, times(3)).compareAndEmit(anyObject(), anyObject(), anyObject());
+    }
 
-	private void sendEventToWindow(DistinctValuesInTimeBatchWindow window, long ts, String host, double value) {
-		window.send(buildStreamEvent(ts, host, value), host, ts);
-	}
+    private void sendEventToWindow(DistinctValuesInTimeBatchWindow window, long ts, String host, double value) {
+        window.send(buildStreamEvent(ts, host, value), host, ts);
+    }
 
-	private StreamEvent buildStreamEvent(long ts, String host, double value) {
-		StreamEvent e = new StreamEvent();
-		e.setData(new Object[] { ts, host, value });
-		e.setStreamId(inputStream);
-		e.setTimestamp(ts);
-		return e;
-	}
+    private StreamEvent buildStreamEvent(long ts, String host, double value) {
+        StreamEvent e = new StreamEvent();
+        e.setData(new Object[] {ts, host, value});
+        e.setStreamId(inputStream);
+        e.setTimestamp(ts);
+        return e;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
index 27744a4..d23abcb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java
@@ -16,21 +16,21 @@
  */
 package org.apache.eagle.alert.engine.nodata;
 
+import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
+import org.junit.Test;
+
 import java.util.Iterator;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow;
-import org.junit.Test;
-
 /**
  * Since 6/28/16.
  */
 public class TestDistinctValuesInTimeWindow {
     @Test
-    public void test(){
-        DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60*1000);
+    public void test() {
+        DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60 * 1000);
         window.send("1", 0);
         window.send("2", 1000);
         window.send("3", 1000);
@@ -42,9 +42,9 @@ public class TestDistinctValuesInTimeWindow {
     }
 
     @Test
-    public void testSort(){
+    public void testSort() {
         SortedMap<DistinctValuesInTimeWindow.ValueAndTime, DistinctValuesInTimeWindow.ValueAndTime> timeSortedMap =
-                new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator());
+            new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator());
         DistinctValuesInTimeWindow.ValueAndTime vt1 = new DistinctValuesInTimeWindow.ValueAndTime("1", 0);
         timeSortedMap.put(vt1, vt1);
         DistinctValuesInTimeWindow.ValueAndTime vt2 = new DistinctValuesInTimeWindow.ValueAndTime("2", 1000);
@@ -55,7 +55,7 @@ public class TestDistinctValuesInTimeWindow {
         DistinctValuesInTimeWindow.ValueAndTime vt4 = new DistinctValuesInTimeWindow.ValueAndTime("1", 30000);
         timeSortedMap.put(vt4, vt4);
         Iterator<?> it = timeSortedMap.entrySet().iterator();
-        while(it.hasNext()){
+        while (it.hasNext()) {
             System.out.println(it.next());
         }
         timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("2", 1000));
@@ -63,8 +63,8 @@ public class TestDistinctValuesInTimeWindow {
         timeSortedMap.put(vt5, vt5);
         DistinctValuesInTimeWindow.ValueAndTime vt6 = new DistinctValuesInTimeWindow.ValueAndTime("1", 62000);
         timeSortedMap.put(vt6, vt6);
-         it = timeSortedMap.entrySet().iterator();
-        while(it.hasNext()){
+        it = timeSortedMap.entrySet().iterator();
+        while (it.hasNext()) {
             System.out.println(it.next());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
index 4149e17..79b939c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java
@@ -28,13 +28,13 @@ import org.wso2.siddhi.core.util.EventPrinter;
  */
 public class TestEventTable {
     @Test
-    public void test() throws Exception{
+    public void test() throws Exception {
         ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime(
-                        "define stream expectStream (key string, src string);"+
-                                "define stream appearStream (key string, src string);"+
-                                "define table expectTable (key string, src string);"+
-                        "from expectStream insert into expectTable;" +
-                                "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;"
+            "define stream expectStream (key string, src string);" +
+                "define stream appearStream (key string, src string);" +
+                "define table expectTable (key string, src string);" +
+                "from expectStream insert into expectTable;" +
+                "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;"
         );
 
         runtime.addCallback("outputStream", new StreamCallback() {
@@ -45,9 +45,9 @@ public class TestEventTable {
         });
 
         runtime.start();
-        runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{"host1","expectStream"});
+        runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {"host1", "expectStream"});
         Thread.sleep(2000);
-        runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{"host2","expectStream"});
+        runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[] {"host2", "expectStream"});
         Thread.sleep(2000);
     }
 }


Mime
View raw message