eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [08/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation
Date Thu, 08 Sep 2016 07:15:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
index 0881e35..13e60d6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
@@ -17,29 +17,26 @@
 package org.apache.eagle.alert.engine.sorter;
 
 /**
- * The time clock per stream
- *
- * Should be thread-safe between getTime and moveForward
- *
- * By default, we currently simple support event timestamp now
+ * The time clock per stream should be thread-safe between getTime and moveForward.
+ * By default, we currently simple support event timestamp now.
  */
 public interface StreamTimeClock {
     /**
-     * Get stream id
+     * Get stream id.
      *
      * @return stream id
      */
     String getStreamId();
 
     /**
-     * Get current time
+     * Get current time.
      *
      * @return current timestamp value
      */
     long getTime();
 
     /**
-     * @param timestamp move forward current time to given timestamp
+     * @param timestamp move forward current time to given timestamp.
      */
     void moveForward(long timestamp);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
index f7e463d..b88f66e 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
@@ -18,10 +18,11 @@ package org.apache.eagle.alert.engine.sorter;
 
 public interface StreamTimeClockListener {
     /**
-     * @see StreamWindow
+     * StreamTimeClockListener onTick callback.
      *
      * @param streamTime
      * @param globalSystemTime
+     * @see StreamWindow
      */
     void onTick(StreamTimeClock streamTime, long globalSystemTime);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
index 29b13eb..08878fd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
@@ -19,27 +19,22 @@ package org.apache.eagle.alert.engine.sorter;
 import java.io.Serializable;
 
 /**
- *
  * By default, we could keep the current time clock in memory,
  * Eventually we may need to consider the global time synchronization across all nodes
  *
- * TODO: maybe need to synchronize time clock globally
+ * <p>TODO: maybe need to synchronize time clock globally</p>
  *
- * 1) When to initialize window according to start time
+ * <p>1) When to initialize window according to start time
  * 2) When to close expired window according to current time
- * 3) Automatically tick periodically as the single place for control lock
- *
+ * 3) Automatically tick periodically as the single place for control lock.</p>
  */
-public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable{
+public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable {
     /**
-     * @return StreamTimeClock instance
+     * @return StreamTimeClock instance.
      */
     StreamTimeClock createStreamTimeClock(String streamId);
 
     StreamTimeClock getStreamTimeClock(String streamId);
 
-    /**
-     * @param streamId
-     */
     void removeStreamTimeClock(String streamId);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
index a0cd184..494ef05 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
@@ -20,37 +20,32 @@ package org.apache.eagle.alert.engine.sorter;
 /**
  * Possible implementation:
  *
- * 1) EventTimeClockTrigger (by default)
- * 2) SystemTimeClockTrigger
+ * <p>1) EventTimeClockTrigger (by default).
+ * 2) SystemTimeClockTrigger.</p>
  */
 public interface StreamTimeClockTrigger {
     /**
-     * @param streamId stream id to listen to
-     * @param listener to watch on streamId
+     * @param streamId stream id to listen to.
+     * @param listener to watch on streamId.
      */
     void registerListener(String streamId, StreamTimeClockListener listener);
 
-    /**
-     *
-     * @param streamClock
-     * @param listener
-     */
     void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener);
 
     /**
-     * @param listener listener to remove
+     * @param listener listener to remove.
      */
     void removeListener(StreamTimeClockListener listener);
 
     /**
-     * Trigger tick of all listeners on certain stream
+     * Trigger tick of all listeners on certain stream.
      *
      * @param streamId stream id
      */
     void triggerTickOn(String streamId);
 
     /**
-     * Update time per new event time on stream
+     * Update time per new event time on stream.
      *
      * @param streamId
      * @param timestamp

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
index 2500122..c30f00f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
@@ -23,91 +23,81 @@ import org.apache.eagle.alert.engine.model.PartitionedEvent;
  * <h2>Tumbling Window instead Sliding Window</h2>
  * We could have time overlap to sort out-of-ordered stream,
  * but each window should never have events overlap, otherwise will have logic problem.
- *
- *
  * <h2>Ingestion Time Policy</h2>
  * Different notions of time, namely processing time, event time, and ingestion time.
- *
  * <ol>
  * <li>
- *  In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window, i.e., a one minute processing time window collects elements for exactly one minute.
+ * In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window,
+ * i.e., a one minute processing time window collects elements for exactly one minute.
  * </li>
  * <li>
- * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics from the actual serving speed of the source and the processing performance of system. Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program. It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery. Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common if a data stream gathers events from distributed sources.
+ * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for
+ * many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the
+ * event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics
+ * from the actual serving speed of the source and the processing performance of system.
+ * Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program.
+ * It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery.
+ *
+ * Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common
+ * if a data stream gathers events from distributed sources.
  * </li>
  * <li>
- * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive in the system (at the source) and continues processing with event time semantics based on the attached timestamps.
+ * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive
+ * in the system (at the source) and continues processing with event time semantics based on the attached timestamps.
  * </li>
  * </ol>
  */
 public interface StreamWindow extends StreamTimeClockListener {
     /**
-     * @return Created timestamp
+     * @return Created timestamp.
      */
     long createdTime();
 
     /**
-     * Get start time
-     *
-     * @return
+     * Get start time.
      */
     long startTime();
 
     long margin();
 
     /**
-     * @return reject timestamp < rejectTime()
+     * @return reject timestamp < rejectTime().
      */
     long rejectTime();
 
     /**
-     * Get end time
-     *
-     * @return
+     * Get end time.
      */
     long endTime();
 
     /**
-     * @param timestamp event time
-     * @return true/false in boolean
+     * @param timestamp event time.
+     * @return true/false in boolean.
      */
     boolean accept(long timestamp);
 
     /**
-     * Window is expired
+     * Window is expired.
      *
      * @return whether window is expired
      */
     boolean expired();
 
     /**
-     * @return whether window is alive
+     * @return whether window is alive.
      */
     boolean alive();
 
-    /**
-     *
-     * @param event
-     */
     boolean add(PartitionedEvent event);
 
-//    /**
-//     * @param collector Drain to output collector
-//     */
-//    void flush(PartitionedEventCollector collector);
-
     void flush();
 
     /**
-     * Close window
+     * Close window.
      */
     void close();
 
     void register(PartitionedEventCollector collector);
 
-    /**
-     *
-     * @return
-     */
     int size();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
index a5f43aa..efa1014 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
@@ -21,43 +21,38 @@ import java.util.Collection;
 
 /**
  * TODO: Reuse existing expired window to avoid recreating new windows again and again
- *
- * Single stream window manager
+ * <p>Single stream window manager.</p>
  */
 public interface StreamWindowManager extends StreamTimeClockListener, Closeable {
 
     /**
-     * @param initialTime
-     * @return
+     * addNewWindow.
      */
     StreamWindow addNewWindow(long initialTime);
 
     /**
-     * @param window
+     * removeWindow.
      */
     void removeWindow(StreamWindow window);
 
     /**
-     * @param window
-     * @return
+     * hasWindow.
+     *
+     * @return if has window
      */
     boolean hasWindow(StreamWindow window);
 
     /**
-     * @param timestamp time
-     * @return whether window exists for time
+     * @param timestamp time.
+     * @return whether window exists for time.
      */
     boolean hasWindowFor(long timestamp);
 
     /**
-     * @return Internal collection for performance optimization
+     * @return Internal collection for performance optimization.
      */
     Collection<StreamWindow> getWindows();
 
-    /**
-     * @param timestamp
-     * @return
-     */
     StreamWindow getWindowFor(long timestamp);
 
     boolean reject(long timestamp);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
index 3e035c5..7a503e1 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
@@ -16,60 +16,58 @@
  */
 package org.apache.eagle.alert.engine.sorter;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
 import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB;
 import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap;
+import com.google.common.base.Preconditions;
 import org.mapdb.DB;
 import org.mapdb.DBMaker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 /**
- *
  * ===== Benchmark Result Report =====<br/><br/>
  *
- * Num. Operation   Type                            Time<br/>
+ * <p>Num. Operation   Type                            Time<br/>
  * ---- ---------   ----                            ----<br/>
- * 1000	FlushTime	DIRECT_MEMORY            	:	55<br/>
- * 1000	FlushTime	FILE_RAF                 	:	63<br/>
- * 1000	FlushTime	MEMORY                   	:	146<br/>
- * 1000	FlushTime	ONHEAP                   	:	17<br/>
- * 1000	InsertTime	DIRECT_MEMORY           	:	68<br/>
- * 1000	InsertTime	FILE_RAF                	:	223<br/>
- * 1000	InsertTime	MEMORY                  	:	273<br/>
- * 1000	InsertTime	ONHEAP                  	:	20<br/>
- * 10000	FlushTime	DIRECT_MEMORY           	:	551<br/>
- * 10000	FlushTime	FILE_RAF                	:	668<br/>
- * 10000	FlushTime	MEMORY                  	:	643<br/>
- * 10000	FlushTime	ONHEAP                  	:	5<br/>
- * 10000	InsertTime	DIRECT_MEMORY          	:	446<br/>
- * 10000	InsertTime	FILE_RAF               	:	2095<br/>
- * 10000	InsertTime	MEMORY                 	:	784<br/>
- * 10000	InsertTime	ONHEAP                 	:	29<br/>
- * 100000	FlushTime	DIRECT_MEMORY          	:	6139<br/>
- * 100000	FlushTime	FILE_RAF               	:	6237<br/>
- * 100000	FlushTime	MEMORY                 	:	6238<br/>
- * 100000	FlushTime	ONHEAP                 	:	18<br/>
- * 100000	InsertTime	DIRECT_MEMORY         	:	4499<br/>
- * 100000	InsertTime	FILE_RAF              	:	22343<br/>
- * 100000	InsertTime	MEMORY                	:	4962<br/>
- * 100000	InsertTime	ONHEAP                	:	107<br/>
- * 1000000	FlushTime	DIRECT_MEMORY         	:	61356<br/>
- * 1000000	FlushTime	FILE_RAF              	:	63025<br/>
- * 1000000	FlushTime	MEMORY                	:	61380<br/>
- * 1000000	FlushTime	ONHEAP                	:	47<br/>
- * 1000000	InsertTime	DIRECT_MEMORY        	:	43637<br/>
- * 1000000	InsertTime	FILE_RAF             	:	464481<br/>
- * 1000000	InsertTime	MEMORY               	:	44367<br/>
- * 1000000	InsertTime	ONHEAP               	:	2040<br/>
- *
+ * 1000    FlushTime    DIRECT_MEMORY                :    55<br/>
+ * 1000    FlushTime    FILE_RAF                     :    63<br/>
+ * 1000    FlushTime    MEMORY                       :    146<br/>
+ * 1000    FlushTime    ONHEAP                       :    17<br/>
+ * 1000    InsertTime    DIRECT_MEMORY               :    68<br/>
+ * 1000    InsertTime    FILE_RAF                    :    223<br/>
+ * 1000    InsertTime    MEMORY                      :    273<br/>
+ * 1000    InsertTime    ONHEAP                      :    20<br/>
+ * 10000    FlushTime    DIRECT_MEMORY               :    551<br/>
+ * 10000    FlushTime    FILE_RAF                    :    668<br/>
+ * 10000    FlushTime    MEMORY                      :    643<br/>
+ * 10000    FlushTime    ONHEAP                      :    5<br/>
+ * 10000    InsertTime    DIRECT_MEMORY              :    446<br/>
+ * 10000    InsertTime    FILE_RAF                   :    2095<br/>
+ * 10000    InsertTime    MEMORY                     :    784<br/>
+ * 10000    InsertTime    ONHEAP                     :    29<br/>
+ * 100000    FlushTime    DIRECT_MEMORY              :    6139<br/>
+ * 100000    FlushTime    FILE_RAF                   :    6237<br/>
+ * 100000    FlushTime    MEMORY                     :    6238<br/>
+ * 100000    FlushTime    ONHEAP                     :    18<br/>
+ * 100000    InsertTime    DIRECT_MEMORY             :    4499<br/>
+ * 100000    InsertTime    FILE_RAF                  :    22343<br/>
+ * 100000    InsertTime    MEMORY                    :    4962<br/>
+ * 100000    InsertTime    ONHEAP                    :    107<br/>
+ * 1000000    FlushTime    DIRECT_MEMORY             :    61356<br/>
+ * 1000000    FlushTime    FILE_RAF                  :    63025<br/>
+ * 1000000    FlushTime    MEMORY                    :    61380<br/>
+ * 1000000    FlushTime    ONHEAP                    :    47<br/>
+ * 1000000    InsertTime    DIRECT_MEMORY            :    43637<br/>
+ * 1000000    InsertTime    FILE_RAF                 :    464481<br/>
+ * 1000000    InsertTime    MEMORY                   :    44367<br/>
+ * 1000000    InsertTime    ONHEAP                   :    2040<br/>
+ * </p>
  * @see StreamSortedWindowOnHeap
  * @see org.mapdb.DBMaker
  */
@@ -105,20 +103,21 @@ public class StreamWindowRepository {
         FILE_RAF
     }
 
-    private final static Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
-    private final Map<StorageType,DB> dbPool;
-    private StreamWindowRepository(){
+    private static final Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
+    private final Map<StorageType, DB> dbPool;
+
+    private StreamWindowRepository() {
         dbPool = new HashMap<>();
     }
 
     private static StreamWindowRepository repository;
 
     /**
-     * Close automatically when JVM exists
+     * Close automatically when JVM exists.
      *
      * @return StreamWindowRepository singletonInstance
      */
-    public static StreamWindowRepository getSingletonInstance(){
+    public static StreamWindowRepository getSingletonInstance() {
         synchronized (StreamWindowRepository.class) {
             if (repository == null) {
                 repository = new StreamWindowRepository();
@@ -133,11 +132,11 @@ public class StreamWindowRepository {
         }
     }
 
-    private DB createMapDB(StorageType storageType){
+    private DB createMapDB(StorageType storageType) {
         synchronized (dbPool) {
-            if(!dbPool.containsKey(storageType)){
+            if (!dbPool.containsKey(storageType)) {
                 DB db;
-                switch (storageType){
+                switch (storageType) {
                     case ONHEAP:
                         db = DBMaker.heapDB().closeOnJvmShutdown().make();
                         LOG.info("Create ONHEAP mapdb");
@@ -157,22 +156,22 @@ public class StreamWindowRepository {
                             file.deleteOnExit();
                             Preconditions.checkNotNull(file, "file is null");
                             db = DBMaker.fileDB(file).deleteFilesAfterClose().make();
-                            LOG.info("Created FILE_RAF map file at {}",file.getAbsolutePath());
+                            LOG.info("Created FILE_RAF map file at {}", file.getAbsolutePath());
                         } catch (IOException e) {
                             throw new IllegalStateException(e);
                         }
                         break;
                     default:
-                        throw new IllegalArgumentException("Illegal storage type: "+storageType);
+                        throw new IllegalArgumentException("Illegal storage type: " + storageType);
                 }
-                dbPool.put(storageType,db);
+                dbPool.put(storageType, db);
                 return db;
             }
             return dbPool.get(storageType);
         }
     }
 
-    public StreamWindow createWindow(long start,long end, long margin, StorageType type){
+    public StreamWindow createWindow(long start, long end, long margin, StorageType type) {
         StreamWindow ret;
         switch (type) {
             case ONHEAP:
@@ -180,77 +179,73 @@ public class StreamWindowRepository {
                 break;
             default:
                 ret = new StreamSortedWindowInMapDB(
-                        start,end,margin,
-                        createMapDB(type),
-                        UUID.randomUUID().toString()
+                    start, end, margin,
+                    createMapDB(type),
+                    UUID.randomUUID().toString()
                 );
                 break;
         }
 
-        if(LOG.isDebugEnabled()) LOG.debug("Created new {}, type: {}",ret,type);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created new {}, type: {}", ret, type);
+        }
         return ret;
     }
 
-    public StreamWindow createWindow(long start,long end, long margin, StreamWindowStrategy strategy){
-        return strategy.createWindow(start,end,margin,this);
+    public StreamWindow createWindow(long start, long end, long margin, StreamWindowStrategy strategy) {
+        return strategy.createWindow(start, end, margin, this);
     }
 
-    public StreamWindow createWindow(long start,long end, long margin){
-        return OnHeapStrategy.INSTANCE.createWindow(start,end,margin,this);
+    public StreamWindow createWindow(long start, long end, long margin) {
+        return OnHeapStrategy.INSTANCE.createWindow(start, end, margin, this);
     }
 
-    public void close(){
-        for(Map.Entry<StorageType,DB> entry:dbPool.entrySet()){
+    public void close() {
+        for (Map.Entry<StorageType, DB> entry : dbPool.entrySet()) {
             entry.getValue().close();
         }
         dbPool.clear();
     }
 
     public interface StreamWindowStrategy {
-        /**
-         *
-         * @param start
-         * @param end
-         * @param margin
-         * @return
-         */
-        StreamWindow createWindow(long start,long end, long margin,StreamWindowRepository repository);
+        StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository);
     }
 
-    public static class OnHeapStrategy implements StreamWindowStrategy{
+    public static class OnHeapStrategy implements StreamWindowStrategy {
         public static final OnHeapStrategy INSTANCE = new OnHeapStrategy();
+
         @Override
-        public StreamWindow createWindow(long start, long end, long margin,StreamWindowRepository repository) {
-            return repository.createWindow(start,end,margin,StorageType.ONHEAP);
+        public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
+            return repository.createWindow(start, end, margin, StorageType.ONHEAP);
         }
     }
 
     public static class WindowSizeStrategy implements StreamWindowStrategy {
-        private final static long ONE_HOUR = 3600 * 1000;
-        private final static long FIVE_HOURS = 5* 3600 * 1000;
+        private static final long ONE_HOUR = 3600 * 1000;
+        private static final long FIVE_HOURS = 5 * 3600 * 1000;
         private final long onheapWindowSizeLimit;
         private final long offheapWindowSizeLimit;
 
-        public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR,FIVE_HOURS);
+        public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR, FIVE_HOURS);
 
-        public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit){
+        public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit) {
             this.offheapWindowSizeLimit = offheapWindowSizeLimit;
             this.onheapWindowSizeLimit = onheapWindowSizeLimit;
 
-            if(this.offheapWindowSizeLimit <this.onheapWindowSizeLimit){
-                throw new IllegalStateException("offheapWindowSizeLimit "+this.offheapWindowSizeLimit +" < onheapWindowSizeLimit "+this.onheapWindowSizeLimit);
+            if (this.offheapWindowSizeLimit < this.onheapWindowSizeLimit) {
+                throw new IllegalStateException("offheapWindowSizeLimit " + this.offheapWindowSizeLimit + " < onheapWindowSizeLimit " + this.onheapWindowSizeLimit);
             }
         }
 
         @Override
-        public StreamWindow createWindow(long start, long end, long margin,StreamWindowRepository repository) {
+        public StreamWindow createWindow(long start, long end, long margin, StreamWindowRepository repository) {
             long windowLength = end - start;
-            if(windowLength <= onheapWindowSizeLimit){
-                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.ONHEAP);
-            }else if(windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit){
-                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
-            }else {
-                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.FILE_RAF);
+            if (windowLength <= onheapWindowSizeLimit) {
+                return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.ONHEAP);
+            } else if (windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit) {
+                return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
+            } else {
+                return repository.createWindow(start, end, margin, StreamWindowRepository.StorageType.FILE_RAF);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
index 6cdf8a0..73adee6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
@@ -16,36 +16,37 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.utils.SerializableUtils;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.jetbrains.annotations.NotNull;
 import org.mapdb.DataInput2;
 import org.mapdb.DataOutput2;
 import org.mapdb.serializer.GroupSerializerObjectArray;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
- * @deprecated performance is worse, should investigate
+ * @deprecated performance is worse, should investigate.
  */
 public class CachedEventGroupSerializer extends GroupSerializerObjectArray<PartitionedEvent[]> {
-    private Map<Integer,StreamPartition> hashCodePartitionDict = new HashMap<>();
-    private void writePartitionedEvent(DataOutput2 out,PartitionedEvent event) throws IOException {
+    private Map<Integer, StreamPartition> hashCodePartitionDict = new HashMap<>();
+
+    private void writePartitionedEvent(DataOutput2 out, PartitionedEvent event) throws IOException {
         out.packLong(event.getPartitionKey());
         int partitionHashCode = 0;
-        if(event.getPartition()!=null) {
+        if (event.getPartition() != null) {
             partitionHashCode = event.getPartition().hashCode();
             if (!hashCodePartitionDict.containsKey(partitionHashCode)) {
                 hashCodePartitionDict.put(partitionHashCode, event.getPartition());
             }
         }
         out.packInt(partitionHashCode);
-        if(event.getEvent()!=null) {
+        if (event.getEvent() != null) {
             byte[] eventBytes = SerializableUtils.serializeToCompressedByteArray(event.getEvent());
             out.packInt(eventBytes.length);
             out.write(eventBytes);
@@ -58,11 +59,11 @@ public class CachedEventGroupSerializer extends GroupSerializerObjectArray<Parti
         PartitionedEvent event = new PartitionedEvent();
         event.setPartitionKey(in.unpackLong());
         int partitionHashCode = in.unpackInt();
-        if(partitionHashCode!=0 && hashCodePartitionDict.containsKey(partitionHashCode)) {
+        if (partitionHashCode != 0 && hashCodePartitionDict.containsKey(partitionHashCode)) {
             event.setPartition(hashCodePartitionDict.get(partitionHashCode));
         }
         int eventBytesLen = in.unpackInt();
-        if(eventBytesLen > 0) {
+        if (eventBytesLen > 0) {
             byte[] eventBytes = new byte[eventBytesLen];
             in.readFully(eventBytes);
             event.setEvent((StreamEvent) SerializableUtils.deserializeFromCompressedByteArray(eventBytes, "Deserialize event from bytes"));
@@ -74,7 +75,7 @@ public class CachedEventGroupSerializer extends GroupSerializerObjectArray<Parti
     public void serialize(DataOutput2 out, PartitionedEvent[] value) throws IOException {
         out.packInt(value.length);
         for (PartitionedEvent event : value) {
-            writePartitionedEvent(out,event);
+            writePartitionedEvent(out, event);
         }
     }
 
@@ -105,9 +106,9 @@ public class CachedEventGroupSerializer extends GroupSerializerObjectArray<Parti
 
     @Override
     public int compare(PartitionedEvent[] o1, PartitionedEvent[] o2) {
-        if(o1.length>0 && o2.length>0) {
+        if (o1.length > 0 && o2.length > 0) {
             return (int) (o1[0].getTimestamp() - o2[0].getTimestamp());
-        }else{
+        } else {
             return 0;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
index caa291e..55efcaf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
@@ -16,9 +16,6 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.io.IOException;
-import java.util.Comparator;
-
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.utils.SerializableUtils;
 import org.jetbrains.annotations.NotNull;
@@ -27,42 +24,37 @@ import org.mapdb.DataOutput2;
 import org.mapdb.Serializer;
 import org.mapdb.serializer.GroupSerializer;
 
+import java.io.IOException;
+import java.util.Comparator;
+
 
 public class PartitionedEventGroupSerializer implements GroupSerializer<PartitionedEvent[]> {
     private static final GroupSerializer<byte[]> delegate = Serializer.BYTE_ARRAY;
 
-    private static PartitionedEvent[] deserialize(byte[] bytes){
-        return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes,"deserialize as stream event");
-    }
-
-    private static byte[] serialize(PartitionedEvent[] events){
-        return SerializableUtils.serializeToCompressedByteArray(events);
-    }
-
     @Override
     public int valueArraySearch(Object keys, PartitionedEvent[] key) {
-        return delegate.valueArraySearch(keys,serialize(key));
+        return delegate.valueArraySearch(keys, serialize(key));
     }
 
     @SuppressWarnings("rawtypes")
     @Override
     public int valueArraySearch(Object keys, PartitionedEvent[] key, Comparator comparator) {
-        return delegate.valueArraySearch(keys,serialize(key),comparator);
+        return delegate.valueArraySearch(keys, serialize(key), comparator);
     }
 
     @Override
     public void valueArraySerialize(DataOutput2 out, Object vals) throws IOException {
-        delegate.valueArraySerialize(out,vals);
+        delegate.valueArraySerialize(out, vals);
     }
 
     @Override
     public Object valueArrayDeserialize(DataInput2 in, int size) throws IOException {
-        return delegate.valueArrayDeserialize(in,size);
+        return delegate.valueArrayDeserialize(in, size);
     }
 
     @Override
     public PartitionedEvent[] valueArrayGet(Object vals, int pos) {
-        return deserialize(delegate.valueArrayGet(vals,pos));
+        return deserialize(delegate.valueArrayGet(vals, pos));
     }
 
     @Override
@@ -77,12 +69,12 @@ public class PartitionedEventGroupSerializer implements GroupSerializer<Partitio
 
     @Override
     public Object valueArrayPut(Object vals, int pos, PartitionedEvent[] newValue) {
-        return delegate.valueArrayPut(vals,pos,serialize(newValue));
+        return delegate.valueArrayPut(vals, pos, serialize(newValue));
     }
 
     @Override
     public Object valueArrayUpdateVal(Object vals, int pos, PartitionedEvent[] newValue) {
-        return delegate.valueArrayUpdateVal(vals,pos,serialize(newValue));
+        return delegate.valueArrayUpdateVal(vals, pos, serialize(newValue));
     }
 
     @Override
@@ -92,21 +84,30 @@ public class PartitionedEventGroupSerializer implements GroupSerializer<Partitio
 
     @Override
     public Object valueArrayCopyOfRange(Object vals, int from, int to) {
-        return delegate.valueArrayCopyOfRange(vals,from,to);
+        return delegate.valueArrayCopyOfRange(vals, from, to);
     }
 
     @Override
     public Object valueArrayDeleteValue(Object vals, int pos) {
-        return delegate.valueArrayDeleteValue(vals,pos);
+        return delegate.valueArrayDeleteValue(vals, pos);
     }
 
     @Override
     public void serialize(@NotNull DataOutput2 out, @NotNull PartitionedEvent[] value) throws IOException {
-        delegate.serialize(out,serialize(value));
+        delegate.serialize(out, serialize(value));
+    }
+
+    private static byte[] serialize(PartitionedEvent[] events) {
+        return SerializableUtils.serializeToCompressedByteArray(events);
     }
 
     @Override
     public PartitionedEvent[] deserialize(@NotNull DataInput2 input, int available) throws IOException {
-        return deserialize(delegate.deserialize(input,available));
+        return deserialize(delegate.deserialize(input, available));
+    }
+
+    private static PartitionedEvent[] deserialize(byte[] bytes) {
+        return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes, "deserialize as stream event");
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
index a9f9f39..5378c67 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
@@ -16,31 +16,31 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
 import java.util.Comparator;
 import java.util.Objects;
 
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
 /**
- * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?
+ * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?.
  */
 public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> {
     public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator();
 
     @Override
     public int compare(PartitionedEvent o1, PartitionedEvent o2) {
-        if(Objects.equals(o1,o2)){
+        if (Objects.equals(o1, o2)) {
             return 0;
-        }else {
-            if(o1 == null && o2 == null){
+        } else {
+            if (o1 == null && o2 == null) {
                 return 0;
-            }else if(o1 != null && o2 == null){
+            } else if (o1 != null && o2 == null) {
                 return 1;
-            }else if(o1 == null){
+            } else if (o1 == null) {
                 return -1;
             }
             // Unstable Sorting Algorithm
-            if(o1.getTimestamp() <= o2.getTimestamp()){
+            if (o1.getTimestamp() <= o2.getTimestamp()) {
                 return -1;
             } else {
                 return 1;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
index 7be69e1..0ae84b8 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.io.IOException;
-
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
@@ -30,8 +28,10 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class StreamSortWindowHandlerImpl implements StreamSortHandler {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class);
     private StreamWindowManager windowManager;
     private StreamSortSpec streamSortSpecSpec;
     private PartitionedEventCollector outputCollector;
@@ -39,17 +39,17 @@ public class StreamSortWindowHandlerImpl implements StreamSortHandler {
 
     public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector) {
         this.windowManager = new StreamWindowManagerImpl(
-                Period.parse(streamSortSpecSpec.getWindowPeriod()),
-                streamSortSpecSpec.getWindowMargin(),
-                PartitionedEventTimeOrderingComparator.INSTANCE,
-                outputCollector);
+            Period.parse(streamSortSpecSpec.getWindowPeriod()),
+            streamSortSpecSpec.getWindowMargin(),
+            PartitionedEventTimeOrderingComparator.INSTANCE,
+            outputCollector);
         this.streamSortSpecSpec = streamSortSpecSpec;
         this.streamId = streamId;
         this.outputCollector = outputCollector;
     }
 
     /**
-     * Entry point to manage window lifecycle
+     * Entry point to manage window lifecycle.
      *
      * @param event StreamEvent
      */
@@ -57,7 +57,7 @@ public class StreamSortWindowHandlerImpl implements StreamSortHandler {
         final long eventTime = event.getEvent().getTimestamp();
         boolean handled = false;
 
-        synchronized (this.windowManager){
+        synchronized (this.windowManager) {
             for (StreamWindow window : this.windowManager.getWindows()) {
                 if (window.alive() && window.add(event)) {
                     handled = true;
@@ -75,8 +75,8 @@ public class StreamSortWindowHandlerImpl implements StreamSortHandler {
             }
         }
 
-        if(!handled){
-            if(LOG.isDebugEnabled()) {
+        if (!handled) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug("Drop expired event {}", event);
             }
             outputCollector.drop(event);
@@ -84,7 +84,7 @@ public class StreamSortWindowHandlerImpl implements StreamSortHandler {
     }
 
     @Override
-    public void onTick(StreamTimeClock clock,long globalSystemTime) {
+    public void onTick(StreamTimeClock clock, long globalSystemTime) {
         windowManager.onTick(clock, globalSystemTime);
     }
 
@@ -93,7 +93,7 @@ public class StreamSortWindowHandlerImpl implements StreamSortHandler {
         try {
             windowManager.close();
         } catch (IOException e) {
-            LOG.error("Got exception while closing window manager",e);
+            LOG.error("Got exception while closing window manager", e);
         }
     }
 
@@ -103,10 +103,10 @@ public class StreamSortWindowHandlerImpl implements StreamSortHandler {
     }
 
     @Override
-    public int hashCode(){
-        if(streamSortSpecSpec == null){
+    public int hashCode() {
+        if (streamSortSpecSpec == null) {
             throw new NullPointerException("streamSortSpec is null");
-        }else{
+        } else {
             return streamSortSpecSpec.hashCode();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
index c1c289d..73a63b4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,65 +16,56 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
+import org.apache.commons.lang3.time.StopWatch;
 import org.mapdb.BTreeMap;
 import org.mapdb.DB;
 import org.mapdb.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * StreamSortedWindow based on MapDB to support off-heap or disk storage.
- *
  * Stable sorting algorithm
- *
- * <br/><br/>
- *
  * See <a href="http://www.mapdb.org">http://www.mapdb.org</a>
  */
 public class StreamSortedWindowInMapDB extends BaseStreamWindow {
     private final String mapId;
-    private BTreeMap<Long, PartitionedEvent[]> bTreeMap;
-    private final static Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class);
+    private BTreeMap<Long, PartitionedEvent[]> btreeMap;
+    private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class);
     private final AtomicInteger size;
-    private  long replaceOpCount = 0;
-    private final static PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer();
+    private long replaceOpCount = 0;
+    private static final PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer();
 
     /**
-     * @param start
-     * @param end
-     * @param margin
-     * @param db
-     * @param mapId physical map id, used to decide whether to reuse or not
+     * @param mapId  physical map id, used to decide whether to reuse or not.
      */
     @SuppressWarnings("unused")
-    public StreamSortedWindowInMapDB(long start, long end, long margin,DB db,String mapId) {
+    public StreamSortedWindowInMapDB(long start, long end, long margin, DB db, String mapId) {
         super(start, end, margin);
         this.mapId = mapId;
         try {
-            bTreeMap = db.<Long, StreamEvent>treeMap(mapId).
-                    keySerializer(Serializer.LONG).
-                    valueSerializer(STREAM_EVENT_GROUP_SERIALIZER).
-                    createOrOpen();
-            LOG.debug("Created BTree map {}",mapId);
-        } catch (Error error){
-            LOG.info("Failed create BTree {}",mapId,error);
+            btreeMap = db.<Long, StreamEvent>treeMap(mapId)
+                .keySerializer(Serializer.LONG)
+                .valueSerializer(STREAM_EVENT_GROUP_SERIALIZER)
+                .createOrOpen();
+            LOG.debug("Created BTree map {}", mapId);
+        } catch (Error error) {
+            LOG.info("Failed create BTree {}", mapId, error);
         }
         size = new AtomicInteger(0);
     }
 
     /**
      * Assumed: most of adding operation will do putting only and few require replacing.
-     *
      * <ol>
- *     <li>
+     * <li>
      * First of all, always try to put with created event directly
      * </li>
      * <li>
@@ -82,31 +73,34 @@ public class StreamSortedWindowInMapDB extends BaseStreamWindow {
      * replace operation will cause more consumption
      * </li>
      * </ol>
+     *
      * @param event coming-in event
      * @return whether success
      */
     @Override
     public synchronized boolean add(PartitionedEvent event) {
         long timestamp = event.getEvent().getTimestamp();
-        if(accept(timestamp)) {
-            boolean absent = bTreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[]{event});
+        if (accept(timestamp)) {
+            boolean absent = btreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[] {event});
             if (!absent) {
                 size.incrementAndGet();
                 return true;
             } else {
-                if(LOG.isDebugEnabled()) LOG.debug("Duplicated timestamp {}, will reduce performance as replacing",timestamp);
-                PartitionedEvent[] oldValue = bTreeMap.get(timestamp);
-                PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1]: Arrays.copyOf(oldValue,oldValue.length+1);
-                newValue[newValue.length-1] = event;
-                PartitionedEvent[] removedValue = bTreeMap.replace(timestamp,newValue);
-                replaceOpCount ++;
-                if(replaceOpCount % 1000 == 0){
-                    LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance",replaceOpCount);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Duplicated timestamp {}, will reduce performance as replacing", timestamp);
+                }
+                PartitionedEvent[] oldValue = btreeMap.get(timestamp);
+                PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1] : Arrays.copyOf(oldValue, oldValue.length + 1);
+                newValue[newValue.length - 1] = event;
+                PartitionedEvent[] removedValue = btreeMap.replace(timestamp, newValue);
+                replaceOpCount++;
+                if (replaceOpCount % 1000 == 0) {
+                    LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance", replaceOpCount);
                 }
-                if(removedValue!=null) {
+                if (removedValue != null) {
                     size.incrementAndGet();
                 } else {
-                    throw new IllegalStateException("Failed to replace key "+timestamp+" with "+newValue.length+" entities array to replace old "+oldValue.length+" entities array");
+                    throw new IllegalStateException("Failed to replace key " + timestamp + " with " + newValue.length + " entities array to replace old " + oldValue.length + " entities array");
                 }
                 return true;
             }
@@ -119,12 +113,12 @@ public class StreamSortedWindowInMapDB extends BaseStreamWindow {
     protected synchronized void flush(PartitionedEventCollector collector) {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
-        bTreeMap.valueIterator().forEachRemaining((events)->{
-            for(PartitionedEvent event:events){
+        btreeMap.valueIterator().forEachRemaining((events) -> {
+            for (PartitionedEvent event : events) {
                 collector.emit(event);
             }
         });
-        bTreeMap.clear();
+        btreeMap.clear();
         replaceOpCount = 0;
         stopWatch.stop();
         LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime());
@@ -132,10 +126,10 @@ public class StreamSortedWindowInMapDB extends BaseStreamWindow {
     }
 
     @Override
-    public synchronized void close(){
+    public synchronized void close() {
         super.close();
-        bTreeMap.close();
-        LOG.info("Closed {}",this.mapId);
+        btreeMap.close();
+        LOG.info("Closed {}", this.mapId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
index d3b1d7d..ed000f1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
@@ -16,33 +16,33 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.util.Comparator;
-
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
+
+import com.google.common.collect.TreeMultiset;
+import org.apache.commons.lang3.time.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.TreeMultiset;
+import java.util.Comparator;
 
 public class StreamSortedWindowOnHeap extends BaseStreamWindow {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class);
     private final TreeMultiset<PartitionedEvent> treeMultisetCache;
 
     /**
-     * @param start start time
-     * @param end end time
-     * @param margin margin time
+     * @param start  start time.
+     * @param end    end time.
+     * @param margin margin time.
      */
-    public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator ){
-        super(start,end,margin);
+    public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator) {
+        super(start, end, margin);
         treeMultisetCache = TreeMultiset.create(comparator);
     }
 
-    public StreamSortedWindowOnHeap(long start, long end, long margin){
-        this(start,end,margin,new PartitionedEventTimeOrderingComparator());
+    public StreamSortedWindowOnHeap(long start, long end, long margin) {
+        this(start, end, margin, new PartitionedEventTimeOrderingComparator());
     }
 
     @Override
@@ -52,7 +52,9 @@ public class StreamSortedWindowOnHeap extends BaseStreamWindow {
                 treeMultisetCache.add(partitionedEvent);
                 return true;
             } else {
-                if(LOG.isDebugEnabled()) LOG.debug("{} is not acceptable, ignored", partitionedEvent);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("{} is not acceptable, ignored", partitionedEvent);
+                }
                 return false;
             }
         }
@@ -67,7 +69,7 @@ public class StreamSortedWindowOnHeap extends BaseStreamWindow {
             int size = treeMultisetCache.size();
             treeMultisetCache.clear();
             stopWatch.stop();
-            LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(),this.toString());
+            LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(), this.toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
index 91a4a37..0c44074 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
@@ -16,33 +16,33 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 
 /**
- * In memory thread-safe time clock service
- *
+ * In memory thread-safe time clock service.
  * TODO: maybe need to synchronize time clock globally, how to?
  */
 public class StreamTimeClockInLocalMemory implements StreamTimeClock {
     private final AtomicLong currentTime;
     private final String streamId;
 
-    public StreamTimeClockInLocalMemory(String streamId,long initialTime){
+    public StreamTimeClockInLocalMemory(String streamId, long initialTime) {
         this.streamId = streamId;
         this.currentTime = new AtomicLong(initialTime);
     }
-    public StreamTimeClockInLocalMemory(String streamId){
-        this(streamId,0L);
+
+    public StreamTimeClockInLocalMemory(String streamId) {
+        this(streamId, 0L);
     }
 
     @Override
-    public void moveForward(long timestamp){
-        if(timestamp < currentTime.get()){
-            throw new IllegalArgumentException(timestamp +" < "+currentTime.get()+", should not move time back");
+    public void moveForward(long timestamp) {
+        if (timestamp < currentTime.get()) {
+            throw new IllegalArgumentException(timestamp + " < " + currentTime.get() + ", should not move time back");
         }
         this.currentTime.set(timestamp);
     }
@@ -59,6 +59,6 @@ public class StreamTimeClockInLocalMemory implements StreamTimeClock {
 
     @Override
     public String toString() {
-        return String.format("StreamClock[streamId=%s, now=%s]",streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get()));
+        return String.format("StreamClock[streamId=%s, now=%s]", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get()));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
index 741d7f0..49378db 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,12 +16,6 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
 import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
 import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
@@ -29,56 +23,56 @@ import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public final class StreamTimeClockManagerImpl implements StreamTimeClockManager {
     private static final long serialVersionUID = -2770823821511195343L;
-    private final static Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class);
-    private final Map<String,StreamTimeClock> streamIdTimeClockMap;
+    private static final Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class);
+    private final Map<String, StreamTimeClock> streamIdTimeClockMap;
     private Timer timer;
 
-    private final Map<StreamTimeClockListener,String> listenerStreamIdMap;
-    private final static AtomicInteger num = new AtomicInteger();
+    private final Map<StreamTimeClockListener, String> listenerStreamIdMap;
+    private static final AtomicInteger num = new AtomicInteger();
 
-    public StreamTimeClockManagerImpl(){
+    public StreamTimeClockManagerImpl() {
         listenerStreamIdMap = new HashMap<>();
         streamIdTimeClockMap = new HashMap<>();
-        timer = new Timer("StreamScheduler-"+num.getAndIncrement());
+        timer = new Timer("StreamScheduler-" + num.getAndIncrement());
         timer.schedule(new TimerTask() {
             @Override
             public void run() {
                 // Make sure the timer tick happens one by one
-                    triggerTickOnAll();
+                triggerTickOnAll();
             }
-        },1000,1000);
+        }, 1000, 1000);
     }
 
     /**
-     *
      * By default, we could keep the current time clock in memory,
      * Eventually we may need to consider the global time synchronization across all nodes
-     *
      * 1) When to initialize window according to start time
      * 2) When to close expired window according to current time
      *
-     * @return StreamTimeClock instance
+     * @return StreamTimeClock instance.
      */
     @Override
-    public StreamTimeClock createStreamTimeClock(String streamId){
+    public StreamTimeClock createStreamTimeClock(String streamId) {
         synchronized (streamIdTimeClockMap) {
             if (!streamIdTimeClockMap.containsKey(streamId)) {
                 StreamTimeClock instance = new StreamTimeClockInLocalMemory(streamId);
                 LOG.info("Created {}", instance);
                 streamIdTimeClockMap.put(streamId, instance);
             } else {
-                LOG.warn("TimeClock for stream already existss: "+streamIdTimeClockMap.get(streamId));
+                LOG.warn("TimeClock for stream already existss: " + streamIdTimeClockMap.get(streamId));
             }
             return streamIdTimeClockMap.get(streamId);
         }
     }
 
-    /**
-     * @param streamId
-     * @return
-     */
     @Override
     public StreamTimeClock getStreamTimeClock(String streamId) {
         synchronized (streamIdTimeClockMap) {
@@ -90,11 +84,8 @@ public final class StreamTimeClockManagerImpl implements StreamTimeClockManager
         }
     }
 
-    /**
-     * @param streamId
-     */
     @Override
-    public void removeStreamTimeClock(String streamId){
+    public void removeStreamTimeClock(String streamId) {
         synchronized (streamIdTimeClockMap) {
             if (streamIdTimeClockMap.containsKey(streamId)) {
                 streamIdTimeClockMap.remove(streamId);
@@ -106,10 +97,11 @@ public final class StreamTimeClockManagerImpl implements StreamTimeClockManager
     }
 
     @Override
-    public void registerListener(String streamId,StreamTimeClockListener listener) {
+    public void registerListener(String streamId, StreamTimeClockListener listener) {
         synchronized (listenerStreamIdMap) {
-            if (listenerStreamIdMap.containsKey(listener))
+            if (listenerStreamIdMap.containsKey(listener)) {
                 throw new IllegalArgumentException("Duplicated listener: " + listener.toString());
+            }
             LOG.info("Register {} on {}", listener, streamId);
             listenerStreamIdMap.put(listener, streamId);
         }
@@ -117,7 +109,7 @@ public final class StreamTimeClockManagerImpl implements StreamTimeClockManager
 
     @Override
     public void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener) {
-        registerListener(streamClock.getStreamId(),listener);
+        registerListener(streamClock.getStreamId(), listener);
     }
 
     @Override
@@ -134,27 +126,32 @@ public final class StreamTimeClockManagerImpl implements StreamTimeClockManager
                 count++;
             }
         }
-        if (LOG.isDebugEnabled()) LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId);
+        }
     }
 
-    private static long getCurrentSystemTime(){
+    private static long getCurrentSystemTime() {
         return System.currentTimeMillis();
     }
 
     @Override
     public void onTimeUpdate(String streamId, long timestamp) {
         StreamTimeClock timeClock = getStreamTimeClock(streamId);
-        if(timeClock == null)
+        if (timeClock == null) {
             return;
+        }
         // Trigger time clock only when time moves forward
-        if(timestamp >= timeClock.getTime()) {
+        if (timestamp >= timeClock.getTime()) {
             timeClock.moveForward(timestamp);
-            if(LOG.isDebugEnabled()) LOG.debug("Tick on stream {} with latest time {}",streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime()));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Tick on stream {} with latest time {}", streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime()));
+            }
             triggerTickOn(streamId);
         }
     }
 
-    private void triggerTickOnAll(){
+    private void triggerTickOnAll() {
         synchronized (listenerStreamIdMap) {
             for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
                 triggerTickOn(entry.getValue());
@@ -166,6 +163,6 @@ public final class StreamTimeClockManagerImpl implements StreamTimeClockManager
     public void close() {
         timer.cancel();
         triggerTickOnAll();
-        LOG.info("Closed StreamTimeClockManager {}",this);
+        LOG.info("Closed StreamTimeClockManager {}", this);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/60f9642e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
index 4e1212f..e8921a2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
@@ -16,13 +16,6 @@
  */
 package org.apache.eagle.alert.engine.sorter.impl;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.commons.lang3.time.StopWatch;
 import org.apache.eagle.alert.engine.PartitionedEventCollector;
 import org.apache.eagle.alert.engine.model.PartitionedEvent;
 import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
@@ -31,13 +24,17 @@ import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
 import org.apache.eagle.alert.engine.sorter.StreamWindowRepository;
 import org.apache.eagle.alert.utils.DateTimeUtil;
 import org.apache.eagle.alert.utils.TimePeriodUtils;
+
+import org.apache.commons.lang3.time.StopWatch;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.*;
+
 public class StreamWindowManagerImpl implements StreamWindowManager {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class);
-    private final TreeMap<Long,StreamWindow> windowBuckets;
+    private static final Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class);
+    private final TreeMap<Long, StreamWindow> windowBuckets;
     private final PartitionedEventCollector collector;
     private final Period windowPeriod;
     private final long windowMargin;
@@ -45,7 +42,7 @@ public class StreamWindowManagerImpl implements StreamWindowManager {
     private final Comparator<PartitionedEvent> comparator;
     private long rejectTime;
 
-    public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector){
+    public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector) {
         this.windowBuckets = new TreeMap<>();
         this.windowPeriod = windowPeriod;
         this.windowMargin = windowMargin;
@@ -64,12 +61,14 @@ public class StreamWindowManagerImpl implements StreamWindowManager {
                 addWindow(window);
                 return window;
             } else {
-                throw new IllegalStateException("Failed to create new window, as " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime));
+                throw new IllegalStateException("Failed to create new window, as "
+                    + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after "
+                    + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime));
             }
         }
     }
 
-    private void addWindow(StreamWindow window){
+    private void addWindow(StreamWindow window) {
         if (!windowBuckets.containsKey(window.startTime())) {
             windowBuckets.put(window.startTime(), window);
         } else {
@@ -121,37 +120,41 @@ public class StreamWindowManagerImpl implements StreamWindowManager {
     }
 
     @Override
-    public void onTick(StreamTimeClock clock,long globalSystemTime) {
+    public void onTick(StreamTimeClock clock, long globalSystemTime) {
         synchronized (windowBuckets) {
             List<StreamWindow> toRemoved = new ArrayList<>();
             List<StreamWindow> aliveWindow = new ArrayList<>();
 
             for (StreamWindow windowBucket : windowBuckets.values()) {
                 windowBucket.onTick(clock, globalSystemTime);
-                if (windowBucket.rejectTime() > rejectTime) rejectTime = windowBucket.rejectTime();
+                if (windowBucket.rejectTime() > rejectTime) {
+                    rejectTime = windowBucket.rejectTime();
+                }
             }
             for (StreamWindow windowBucket : windowBuckets.values()) {
-                if (windowBucket.expired() || windowBucket.endTime() <=rejectTime) {
+                if (windowBucket.expired() || windowBucket.endTime() <= rejectTime) {
                     toRemoved.add(windowBucket);
                 } else {
                     aliveWindow.add(windowBucket);
                 }
             }
-            toRemoved.forEach(this::CloseAndRemoveWindow);
-            if (toRemoved.size() > 0) LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved);
+            toRemoved.forEach(this::closeAndRemoveWindow);
+            if (toRemoved.size() > 0) {
+                LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved);
+            }
         }
     }
 
-    private void CloseAndRemoveWindow(StreamWindow windowBucket){
+    private void closeAndRemoveWindow(StreamWindow windowBucket) {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
-        CloseWindow(windowBucket);
+        closeWindow(windowBucket);
         removeWindow(windowBucket);
         stopWatch.stop();
-        LOG.info("Removed {} in {} ms",windowBucket,stopWatch.getTime());
+        LOG.info("Removed {} in {} ms", windowBucket, stopWatch.getTime());
     }
 
-    private void CloseWindow(StreamWindow windowBucket){
+    private void closeWindow(StreamWindow windowBucket) {
         windowBucket.close();
     }
 
@@ -163,7 +166,7 @@ public class StreamWindowManagerImpl implements StreamWindowManager {
             int count = 0;
             for (StreamWindow windowBucket : getWindows()) {
                 count++;
-                CloseWindow(windowBucket);
+                closeWindow(windowBucket);
             }
             windowBuckets.clear();
             stopWatch.stop();


Mime
View raw message