eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [30/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
new file mode 100644
index 0000000..a5f43aa
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.io.Closeable;
+import java.util.Collection;
+
+/**
+ * TODO: Reuse existing expired window to avoid recreating new windows again and again
+ *
+ * Single stream window manager
+ */
+public interface StreamWindowManager extends StreamTimeClockListener, Closeable {
+
+    /**
+     * @param initialTime
+     * @return
+     */
+    StreamWindow addNewWindow(long initialTime);
+
+    /**
+     * @param window
+     */
+    void removeWindow(StreamWindow window);
+
+    /**
+     * @param window
+     * @return
+     */
+    boolean hasWindow(StreamWindow window);
+
+    /**
+     * @param timestamp time
+     * @return whether window exists for time
+     */
+    boolean hasWindowFor(long timestamp);
+
+    /**
+     * @return Internal collection for performance optimization
+     */
+    Collection<StreamWindow> getWindows();
+
+    /**
+     * @param timestamp
+     * @return
+     */
+    StreamWindow getWindowFor(long timestamp);
+
+    boolean reject(long timestamp);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
new file mode 100755
index 0000000..3e035c5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB;
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ * ===== Benchmark Result Report =====<br/><br/>
+ *
+ * Num. Operation   Type                            Time<br/>
+ * ---- ---------   ----                            ----<br/>
+ * 1000	FlushTime	DIRECT_MEMORY            	:	55<br/>
+ * 1000	FlushTime	FILE_RAF                 	:	63<br/>
+ * 1000	FlushTime	MEMORY                   	:	146<br/>
+ * 1000	FlushTime	ONHEAP                   	:	17<br/>
+ * 1000	InsertTime	DIRECT_MEMORY           	:	68<br/>
+ * 1000	InsertTime	FILE_RAF                	:	223<br/>
+ * 1000	InsertTime	MEMORY                  	:	273<br/>
+ * 1000	InsertTime	ONHEAP                  	:	20<br/>
+ * 10000	FlushTime	DIRECT_MEMORY           	:	551<br/>
+ * 10000	FlushTime	FILE_RAF                	:	668<br/>
+ * 10000	FlushTime	MEMORY                  	:	643<br/>
+ * 10000	FlushTime	ONHEAP                  	:	5<br/>
+ * 10000	InsertTime	DIRECT_MEMORY          	:	446<br/>
+ * 10000	InsertTime	FILE_RAF               	:	2095<br/>
+ * 10000	InsertTime	MEMORY                 	:	784<br/>
+ * 10000	InsertTime	ONHEAP                 	:	29<br/>
+ * 100000	FlushTime	DIRECT_MEMORY          	:	6139<br/>
+ * 100000	FlushTime	FILE_RAF               	:	6237<br/>
+ * 100000	FlushTime	MEMORY                 	:	6238<br/>
+ * 100000	FlushTime	ONHEAP                 	:	18<br/>
+ * 100000	InsertTime	DIRECT_MEMORY         	:	4499<br/>
+ * 100000	InsertTime	FILE_RAF              	:	22343<br/>
+ * 100000	InsertTime	MEMORY                	:	4962<br/>
+ * 100000	InsertTime	ONHEAP                	:	107<br/>
+ * 1000000	FlushTime	DIRECT_MEMORY         	:	61356<br/>
+ * 1000000	FlushTime	FILE_RAF              	:	63025<br/>
+ * 1000000	FlushTime	MEMORY                	:	61380<br/>
+ * 1000000	FlushTime	ONHEAP                	:	47<br/>
+ * 1000000	InsertTime	DIRECT_MEMORY        	:	43637<br/>
+ * 1000000	InsertTime	FILE_RAF             	:	464481<br/>
+ * 1000000	InsertTime	MEMORY               	:	44367<br/>
+ * 1000000	InsertTime	ONHEAP               	:	2040<br/>
+ *
+ * @see StreamSortedWindowOnHeap
+ * @see org.mapdb.DBMaker
+ */
+public class StreamWindowRepository {
+    public enum StorageType {
+        /**
+         * Creates new in-memory database which stores all data on heap without serialization.
+         * This mode should be very fast, but data will affect Garbage PartitionedEventCollector the same way as traditional Java Collections.
+         */
+        ONHEAP,
+
+        /**
+         * Creates new in-memory database. Changes are lost after JVM exits.
+         * This option serializes data into {@code byte[]},
+         * so they are not affected by Garbage PartitionedEventCollector.
+         */
+        MEMORY,
+
+        /**
+         * <p>
+         * Creates new in-memory database. Changes are lost after JVM exits.
+         * </p><p>
+         * This will use {@code DirectByteBuffer} outside of HEAP, so Garbage Collector is not affected
+         * You should increase ammount of direct memory with
+         * {@code -XX:MaxDirectMemorySize=10G} JVM param
+         * </p>
+         */
+        DIRECT_MEMORY,
+
+        /**
+         * By default use File.createTempFile("streamwindows","temp")
+         */
+        FILE_RAF
+    }
+
+    private final static Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
+    private final Map<StorageType,DB> dbPool;
+    private StreamWindowRepository(){
+        dbPool = new HashMap<>();
+    }
+
+    private static StreamWindowRepository repository;
+
+    /**
+     * Close automatically when JVM exists
+     *
+     * @return StreamWindowRepository singletonInstance
+     */
+    public static StreamWindowRepository getSingletonInstance(){
+        synchronized (StreamWindowRepository.class) {
+            if (repository == null) {
+                repository = new StreamWindowRepository();
+                Runtime.getRuntime().addShutdownHook(new Thread() {
+                    @Override
+                    public void run() {
+                        repository.close();
+                    }
+                });
+            }
+            return repository;
+        }
+    }
+
+    private DB createMapDB(StorageType storageType){
+        synchronized (dbPool) {
+            if(!dbPool.containsKey(storageType)){
+                DB db;
+                switch (storageType){
+                    case ONHEAP:
+                        db = DBMaker.heapDB().closeOnJvmShutdown().make();
+                        LOG.info("Create ONHEAP mapdb");
+                        break;
+                    case MEMORY:
+                        db = DBMaker.memoryDB().closeOnJvmShutdown().make();
+                        LOG.info("Create MEMORY mapdb");
+                        break;
+                    case DIRECT_MEMORY:
+                        db = DBMaker.memoryDirectDB().closeOnJvmShutdown().make();
+                        LOG.info("Create DIRECT_MEMORY mapdb");
+                        break;
+                    case FILE_RAF:
+                        try {
+                            File file = File.createTempFile("window-", ".map");
+                            file.delete();
+                            file.deleteOnExit();
+                            Preconditions.checkNotNull(file, "file is null");
+                            db = DBMaker.fileDB(file).deleteFilesAfterClose().make();
+                            LOG.info("Created FILE_RAF map file at {}",file.getAbsolutePath());
+                        } catch (IOException e) {
+                            throw new IllegalStateException(e);
+                        }
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal storage type: "+storageType);
+                }
+                dbPool.put(storageType,db);
+                return db;
+            }
+            return dbPool.get(storageType);
+        }
+    }
+
+    public StreamWindow createWindow(long start,long end, long margin, StorageType type){
+        StreamWindow ret;
+        switch (type) {
+            case ONHEAP:
+                ret = new StreamSortedWindowOnHeap(start, end, margin);
+                break;
+            default:
+                ret = new StreamSortedWindowInMapDB(
+                        start,end,margin,
+                        createMapDB(type),
+                        UUID.randomUUID().toString()
+                );
+                break;
+        }
+
+        if(LOG.isDebugEnabled()) LOG.debug("Created new {}, type: {}",ret,type);
+        return ret;
+    }
+
+    public StreamWindow createWindow(long start,long end, long margin, StreamWindowStrategy strategy){
+        return strategy.createWindow(start,end,margin,this);
+    }
+
+    public StreamWindow createWindow(long start,long end, long margin){
+        return OnHeapStrategy.INSTANCE.createWindow(start,end,margin,this);
+    }
+
+    public void close(){
+        for(Map.Entry<StorageType,DB> entry:dbPool.entrySet()){
+            entry.getValue().close();
+        }
+        dbPool.clear();
+    }
+
+    public interface StreamWindowStrategy {
+        /**
+         *
+         * @param start
+         * @param end
+         * @param margin
+         * @return
+         */
+        StreamWindow createWindow(long start,long end, long margin,StreamWindowRepository repository);
+    }
+
+    public static class OnHeapStrategy implements StreamWindowStrategy{
+        public static final OnHeapStrategy INSTANCE = new OnHeapStrategy();
+        @Override
+        public StreamWindow createWindow(long start, long end, long margin,StreamWindowRepository repository) {
+            return repository.createWindow(start,end,margin,StorageType.ONHEAP);
+        }
+    }
+
+    public static class WindowSizeStrategy implements StreamWindowStrategy {
+        private final static long ONE_HOUR = 3600 * 1000;
+        private final static long FIVE_HOURS = 5* 3600 * 1000;
+        private final long onheapWindowSizeLimit;
+        private final long offheapWindowSizeLimit;
+
+        public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR,FIVE_HOURS);
+
+        public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit){
+            this.offheapWindowSizeLimit = offheapWindowSizeLimit;
+            this.onheapWindowSizeLimit = onheapWindowSizeLimit;
+
+            if(this.offheapWindowSizeLimit <this.onheapWindowSizeLimit){
+                throw new IllegalStateException("offheapWindowSizeLimit "+this.offheapWindowSizeLimit +" < onheapWindowSizeLimit "+this.onheapWindowSizeLimit);
+            }
+        }
+
+        @Override
+        public StreamWindow createWindow(long start, long end, long margin,StreamWindowRepository repository) {
+            long windowLength = end - start;
+            if(windowLength <= onheapWindowSizeLimit){
+                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.ONHEAP);
+            }else if(windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit){
+                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
+            }else {
+                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.FILE_RAF);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
new file mode 100644
index 0000000..6cdf8a0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.utils.SerializableUtils;
+import org.jetbrains.annotations.NotNull;
+import org.mapdb.DataInput2;
+import org.mapdb.DataOutput2;
+import org.mapdb.serializer.GroupSerializerObjectArray;
+
+/**
+ * @deprecated performance is worse, should investigate
+ */
+public class CachedEventGroupSerializer extends GroupSerializerObjectArray<PartitionedEvent[]> {
+    private Map<Integer,StreamPartition> hashCodePartitionDict = new HashMap<>();
+    private void writePartitionedEvent(DataOutput2 out,PartitionedEvent event) throws IOException {
+        out.packLong(event.getPartitionKey());
+        int partitionHashCode = 0;
+        if(event.getPartition()!=null) {
+            partitionHashCode = event.getPartition().hashCode();
+            if (!hashCodePartitionDict.containsKey(partitionHashCode)) {
+                hashCodePartitionDict.put(partitionHashCode, event.getPartition());
+            }
+        }
+        out.packInt(partitionHashCode);
+        if(event.getEvent()!=null) {
+            byte[] eventBytes = SerializableUtils.serializeToCompressedByteArray(event.getEvent());
+            out.packInt(eventBytes.length);
+            out.write(eventBytes);
+        } else {
+            out.packInt(0);
+        }
+    }
+
+    private PartitionedEvent readPartitionedEvent(DataInput2 in) throws IOException {
+        PartitionedEvent event = new PartitionedEvent();
+        event.setPartitionKey(in.unpackLong());
+        int partitionHashCode = in.unpackInt();
+        if(partitionHashCode!=0 && hashCodePartitionDict.containsKey(partitionHashCode)) {
+            event.setPartition(hashCodePartitionDict.get(partitionHashCode));
+        }
+        int eventBytesLen = in.unpackInt();
+        if(eventBytesLen > 0) {
+            byte[] eventBytes = new byte[eventBytesLen];
+            in.readFully(eventBytes);
+            event.setEvent((StreamEvent) SerializableUtils.deserializeFromCompressedByteArray(eventBytes, "Deserialize event from bytes"));
+        }
+        return event;
+    }
+
+    @Override
+    public void serialize(DataOutput2 out, PartitionedEvent[] value) throws IOException {
+        out.packInt(value.length);
+        for (PartitionedEvent event : value) {
+            writePartitionedEvent(out,event);
+        }
+    }
+
+    @Override
+    public PartitionedEvent[] deserialize(DataInput2 in, int available) throws IOException {
+        final int size = in.unpackInt();
+        PartitionedEvent[] ret = new PartitionedEvent[size];
+        for (int i = 0; i < size; i++) {
+            ret[i] = readPartitionedEvent(in);
+        }
+        return ret;
+    }
+
+    @Override
+    public boolean isTrusted() {
+        return true;
+    }
+
+    @Override
+    public boolean equals(PartitionedEvent[] a1, PartitionedEvent[] a2) {
+        return a1[0].getTimestamp() == a2[0].getTimestamp();
+    }
+
+    @Override
+    public int hashCode(@NotNull PartitionedEvent[] events, int seed) {
+        return new HashCodeBuilder().append(events).toHashCode();
+    }
+
+    @Override
+    public int compare(PartitionedEvent[] o1, PartitionedEvent[] o2) {
+        if(o1.length>0 && o2.length>0) {
+            return (int) (o1[0].getTimestamp() - o2[0].getTimestamp());
+        }else{
+            return 0;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
new file mode 100644
index 0000000..caa291e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.utils.SerializableUtils;
+import org.jetbrains.annotations.NotNull;
+import org.mapdb.DataInput2;
+import org.mapdb.DataOutput2;
+import org.mapdb.Serializer;
+import org.mapdb.serializer.GroupSerializer;
+
+
+public class PartitionedEventGroupSerializer implements GroupSerializer<PartitionedEvent[]> {
+    private static final GroupSerializer<byte[]> delegate = Serializer.BYTE_ARRAY;
+
+    private static PartitionedEvent[] deserialize(byte[] bytes){
+        return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes,"deserialize as stream event");
+    }
+
+    private static byte[] serialize(PartitionedEvent[] events){
+        return SerializableUtils.serializeToCompressedByteArray(events);
+    }
+
+    @Override
+    public int valueArraySearch(Object keys, PartitionedEvent[] key) {
+        return delegate.valueArraySearch(keys,serialize(key));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public int valueArraySearch(Object keys, PartitionedEvent[] key, Comparator comparator) {
+        return delegate.valueArraySearch(keys,serialize(key),comparator);
+    }
+
+    @Override
+    public void valueArraySerialize(DataOutput2 out, Object vals) throws IOException {
+        delegate.valueArraySerialize(out,vals);
+    }
+
+    @Override
+    public Object valueArrayDeserialize(DataInput2 in, int size) throws IOException {
+        return delegate.valueArrayDeserialize(in,size);
+    }
+
+    @Override
+    public PartitionedEvent[] valueArrayGet(Object vals, int pos) {
+        return deserialize(delegate.valueArrayGet(vals,pos));
+    }
+
+    @Override
+    public int valueArraySize(Object vals) {
+        return delegate.valueArraySize(vals);
+    }
+
+    @Override
+    public Object valueArrayEmpty() {
+        return delegate.valueArrayEmpty();
+    }
+
+    @Override
+    public Object valueArrayPut(Object vals, int pos, PartitionedEvent[] newValue) {
+        return delegate.valueArrayPut(vals,pos,serialize(newValue));
+    }
+
+    @Override
+    public Object valueArrayUpdateVal(Object vals, int pos, PartitionedEvent[] newValue) {
+        return delegate.valueArrayUpdateVal(vals,pos,serialize(newValue));
+    }
+
+    @Override
+    public Object valueArrayFromArray(Object[] objects) {
+        return delegate.valueArrayFromArray(objects);
+    }
+
+    @Override
+    public Object valueArrayCopyOfRange(Object vals, int from, int to) {
+        return delegate.valueArrayCopyOfRange(vals,from,to);
+    }
+
+    @Override
+    public Object valueArrayDeleteValue(Object vals, int pos) {
+        return delegate.valueArrayDeleteValue(vals,pos);
+    }
+
+    @Override
+    public void serialize(@NotNull DataOutput2 out, @NotNull PartitionedEvent[] value) throws IOException {
+        delegate.serialize(out,serialize(value));
+    }
+
+    @Override
+    public PartitionedEvent[] deserialize(@NotNull DataInput2 input, int available) throws IOException {
+        return deserialize(delegate.deserialize(input,available));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
new file mode 100644
index 0000000..a9f9f39
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.Comparator;
+import java.util.Objects;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?
+ */
+public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> {
+    public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator();
+
+    @Override
+    public int compare(PartitionedEvent o1, PartitionedEvent o2) {
+        if(Objects.equals(o1,o2)){
+            return 0;
+        }else {
+            if(o1 == null && o2 == null){
+                return 0;
+            }else if(o1 != null && o2 == null){
+                return 1;
+            }else if(o1 == null){
+                return -1;
+            }
+            // Unstable Sorting Algorithm
+            if(o1.getTimestamp() <= o2.getTimestamp()){
+                return -1;
+            } else {
+                return 1;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
new file mode 100644
index 0000000..7be69e1
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.io.IOException;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.router.StreamSortHandler;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
+import org.apache.eagle.alert.engine.sorter.StreamWindow;
+import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamSortWindowHandlerImpl implements StreamSortHandler {
+    private final static Logger LOG = LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class);
+    private StreamWindowManager windowManager;
+    private StreamSortSpec streamSortSpecSpec;
+    private PartitionedEventCollector outputCollector;
+    private String streamId;
+
+    public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, PartitionedEventCollector outputCollector) {
+        this.windowManager = new StreamWindowManagerImpl(
+                Period.parse(streamSortSpecSpec.getWindowPeriod()),
+                streamSortSpecSpec.getWindowMargin(),
+                PartitionedEventTimeOrderingComparator.INSTANCE,
+                outputCollector);
+        this.streamSortSpecSpec = streamSortSpecSpec;
+        this.streamId = streamId;
+        this.outputCollector = outputCollector;
+    }
+
+    /**
+     * Entry point to manage window lifecycle
+     *
+     * @param event StreamEvent
+     */
+    public void nextEvent(PartitionedEvent event) {
+        final long eventTime = event.getEvent().getTimestamp();
+        boolean handled = false;
+
+        synchronized (this.windowManager){
+            for (StreamWindow window : this.windowManager.getWindows()) {
+                if (window.alive() && window.add(event)) {
+                    handled = true;
+                }
+            }
+
+            // No window found for the event but not too late being rejected
+            if (!handled && !windowManager.reject(eventTime)) {
+                // later then all events, create later window
+                StreamWindow window = windowManager.addNewWindow(eventTime);
+                if (window.add(event)) {
+                    LOG.info("Created {} of {} at {}", window, this.streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(eventTime));
+                    handled = true;
+                }
+            }
+        }
+
+        if(!handled){
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Drop expired event {}", event);
+            }
+            outputCollector.drop(event);
+        }
+    }
+
+    @Override
+    public void onTick(StreamTimeClock clock,long globalSystemTime) {
+        windowManager.onTick(clock, globalSystemTime);
+    }
+
+    @Override
+    public void close() {
+        try {
+            windowManager.close();
+        } catch (IOException e) {
+            LOG.error("Got exception while closing window manager",e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }
+
+    @Override
+    public int hashCode(){
+        if(streamSortSpecSpec == null){
+            throw new NullPointerException("streamSortSpec is null");
+        }else{
+            return streamSortSpecSpec.hashCode();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
new file mode 100644
index 0000000..c1c289d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
+import org.mapdb.BTreeMap;
+import org.mapdb.DB;
+import org.mapdb.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StreamSortedWindow based on MapDB to support off-heap or disk storage.
+ *
+ * Stable sorting algorithm
+ *
+ * <br/><br/>
+ *
+ * See <a href="http://www.mapdb.org">http://www.mapdb.org</a>
+ */
+public class StreamSortedWindowInMapDB extends BaseStreamWindow {
+    private final String mapId;
+    private BTreeMap<Long, PartitionedEvent[]> bTreeMap;
+    private final static Logger LOG = LoggerFactory.getLogger(StreamSortedWindowInMapDB.class);
+    private final AtomicInteger size;
+    private  long replaceOpCount = 0;
+    private final static PartitionedEventGroupSerializer STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer();
+
+    /**
+     * @param start
+     * @param end
+     * @param margin
+     * @param db
+     * @param mapId physical map id, used to decide whether to reuse or not
+     */
+    @SuppressWarnings("unused")
+    public StreamSortedWindowInMapDB(long start, long end, long margin,DB db,String mapId) {
+        super(start, end, margin);
+        this.mapId = mapId;
+        try {
+            bTreeMap = db.<Long, StreamEvent>treeMap(mapId).
+                    keySerializer(Serializer.LONG).
+                    valueSerializer(STREAM_EVENT_GROUP_SERIALIZER).
+                    createOrOpen();
+            LOG.debug("Created BTree map {}",mapId);
+        } catch (Error error){
+            LOG.info("Failed create BTree {}",mapId,error);
+        }
+        size = new AtomicInteger(0);
+    }
+
+    /**
+     * Assumed: most of adding operation will do putting only and few require replacing.
+     *
+     * <ol>
+ *     <li>
+     * First of all, always try to put with created event directly
+     * </li>
+     * <li>
+     * If not absent (key already exists), then append and replace,
+     * replace operation will cause more consumption
+     * </li>
+     * </ol>
+     * @param event coming-in event
+     * @return whether success
+     */
+    @Override
+    public synchronized boolean add(PartitionedEvent event) {
+        long timestamp = event.getEvent().getTimestamp();
+        if(accept(timestamp)) {
+            boolean absent = bTreeMap.putIfAbsentBoolean(timestamp, new PartitionedEvent[]{event});
+            if (!absent) {
+                size.incrementAndGet();
+                return true;
+            } else {
+                if(LOG.isDebugEnabled()) LOG.debug("Duplicated timestamp {}, will reduce performance as replacing",timestamp);
+                PartitionedEvent[] oldValue = bTreeMap.get(timestamp);
+                PartitionedEvent[] newValue = oldValue == null ? new PartitionedEvent[1]: Arrays.copyOf(oldValue,oldValue.length+1);
+                newValue[newValue.length-1] = event;
+                PartitionedEvent[] removedValue = bTreeMap.replace(timestamp,newValue);
+                replaceOpCount ++;
+                if(replaceOpCount % 1000 == 0){
+                    LOG.warn("Too many events ({}) with overlap timestamp, may reduce insertion performance",replaceOpCount);
+                }
+                if(removedValue!=null) {
+                    size.incrementAndGet();
+                } else {
+                    throw new IllegalStateException("Failed to replace key "+timestamp+" with "+newValue.length+" entities array to replace old "+oldValue.length+" entities array");
+                }
+                return true;
+            }
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    protected synchronized void flush(PartitionedEventCollector collector) {
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        bTreeMap.valueIterator().forEachRemaining((events)->{
+            for(PartitionedEvent event:events){
+                collector.emit(event);
+            }
+        });
+        bTreeMap.clear();
+        replaceOpCount = 0;
+        stopWatch.stop();
+        LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime());
+        size.set(0);
+    }
+
+    @Override
+    public synchronized void close(){
+        super.close();
+        bTreeMap.close();
+        LOG.info("Closed {}",this.mapId);
+    }
+
+    @Override
+    public synchronized int size() {
+        return size.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
new file mode 100644
index 0000000..d3b1d7d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.Comparator;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.TreeMultiset;
+
+public class StreamSortedWindowOnHeap extends BaseStreamWindow {
+    private final static Logger LOG = LoggerFactory.getLogger(StreamSortedWindowOnHeap.class);
+    private final TreeMultiset<PartitionedEvent> treeMultisetCache;
+
+    /**
+     * @param start start time
+     * @param end end time
+     * @param margin margin time
+     */
+    public StreamSortedWindowOnHeap(long start, long end, long margin, Comparator<PartitionedEvent> comparator ){
+        super(start,end,margin);
+        treeMultisetCache = TreeMultiset.create(comparator);
+    }
+
+    public StreamSortedWindowOnHeap(long start, long end, long margin){
+        this(start,end,margin,new PartitionedEventTimeOrderingComparator());
+    }
+
+    @Override
+    public boolean add(PartitionedEvent partitionedEvent) {
+        synchronized (treeMultisetCache) {
+            if (accept(partitionedEvent.getEvent().getTimestamp())) {
+                treeMultisetCache.add(partitionedEvent);
+                return true;
+            } else {
+                if(LOG.isDebugEnabled()) LOG.debug("{} is not acceptable, ignored", partitionedEvent);
+                return false;
+            }
+        }
+    }
+
+    @Override
+    protected void flush(PartitionedEventCollector collector) {
+        synchronized (treeMultisetCache) {
+            StopWatch stopWatch = new StopWatch();
+            stopWatch.start();
+            treeMultisetCache.forEach(collector::emit);
+            int size = treeMultisetCache.size();
+            treeMultisetCache.clear();
+            stopWatch.stop();
+            LOG.info("Flushed {} events in {} ms from {}", size, stopWatch.getTime(),this.toString());
+        }
+    }
+
+    @Override
+    public int size() {
+        synchronized (treeMultisetCache) {
+            return treeMultisetCache.size();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
new file mode 100644
index 0000000..91a4a37
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+
+
+/**
+ * In memory thread-safe time clock service
+ *
+ * TODO: maybe need to synchronize time clock globally, how to?
+ */
+public class StreamTimeClockInLocalMemory implements StreamTimeClock {
+    private final AtomicLong currentTime;
+    private final String streamId;
+
+    public StreamTimeClockInLocalMemory(String streamId,long initialTime){
+        this.streamId = streamId;
+        this.currentTime = new AtomicLong(initialTime);
+    }
+    public StreamTimeClockInLocalMemory(String streamId){
+        this(streamId,0L);
+    }
+
+    @Override
+    public void moveForward(long timestamp){
+        if(timestamp < currentTime.get()){
+            throw new IllegalArgumentException(timestamp +" < "+currentTime.get()+", should not move time back");
+        }
+        this.currentTime.set(timestamp);
+    }
+
+    @Override
+    public String getStreamId() {
+        return streamId;
+    }
+
+    @Override
+    public long getTime() {
+        return currentTime.get();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("StreamClock[streamId=%s, now=%s]",streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get()));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
new file mode 100644
index 0000000..741d7f0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class StreamTimeClockManagerImpl implements StreamTimeClockManager {
+    private static final long serialVersionUID = -2770823821511195343L;
+    private final static Logger LOG = LoggerFactory.getLogger(StreamTimeClockManagerImpl.class);
+    private final Map<String,StreamTimeClock> streamIdTimeClockMap;
+    private Timer timer;
+
+    private final Map<StreamTimeClockListener,String> listenerStreamIdMap;
+    private final static AtomicInteger num = new AtomicInteger();
+
+    public StreamTimeClockManagerImpl(){
+        listenerStreamIdMap = new HashMap<>();
+        streamIdTimeClockMap = new HashMap<>();
+        timer = new Timer("StreamScheduler-"+num.getAndIncrement());
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+                // Make sure the timer tick happens one by one
+                    triggerTickOnAll();
+            }
+        },1000,1000);
+    }
+
+    /**
+     *
+     * By default, we could keep the current time clock in memory,
+     * Eventually we may need to consider the global time synchronization across all nodes
+     *
+     * 1) When to initialize window according to start time
+     * 2) When to close expired window according to current time
+     *
+     * @return StreamTimeClock instance
+     */
+    @Override
+    public StreamTimeClock createStreamTimeClock(String streamId){
+        synchronized (streamIdTimeClockMap) {
+            if (!streamIdTimeClockMap.containsKey(streamId)) {
+                StreamTimeClock instance = new StreamTimeClockInLocalMemory(streamId);
+                LOG.info("Created {}", instance);
+                streamIdTimeClockMap.put(streamId, instance);
+            } else {
+                LOG.warn("TimeClock for stream already existss: "+streamIdTimeClockMap.get(streamId));
+            }
+            return streamIdTimeClockMap.get(streamId);
+        }
+    }
+
+    /**
+     * @param streamId
+     * @return
+     */
+    @Override
+    public StreamTimeClock getStreamTimeClock(String streamId) {
+        synchronized (streamIdTimeClockMap) {
+            if (!streamIdTimeClockMap.containsKey(streamId)) {
+                LOG.warn("TimeClock for stream {} is not initialized before being called, create now", streamId);
+                return createStreamTimeClock(streamId);
+            }
+            return streamIdTimeClockMap.get(streamId);
+        }
+    }
+
+    /**
+     * @param streamId
+     */
+    @Override
+    public void removeStreamTimeClock(String streamId){
+        synchronized (streamIdTimeClockMap) {
+            if (streamIdTimeClockMap.containsKey(streamId)) {
+                streamIdTimeClockMap.remove(streamId);
+                LOG.info("Removed TimeClock for stream {}: {}", streamId, streamIdTimeClockMap.get(streamId));
+            } else {
+                LOG.warn("No TimeClock found for stream {}, nothing to remove", streamId);
+            }
+        }
+    }
+
+    @Override
+    public void registerListener(String streamId,StreamTimeClockListener listener) {
+        synchronized (listenerStreamIdMap) {
+            if (listenerStreamIdMap.containsKey(listener))
+                throw new IllegalArgumentException("Duplicated listener: " + listener.toString());
+            LOG.info("Register {} on {}", listener, streamId);
+            listenerStreamIdMap.put(listener, streamId);
+        }
+    }
+
+    @Override
+    public void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener) {
+        registerListener(streamClock.getStreamId(),listener);
+    }
+
+    @Override
+    public void removeListener(StreamTimeClockListener listener) {
+        listenerStreamIdMap.remove(listener);
+    }
+
+    @Override
+    public synchronized void triggerTickOn(String streamId) {
+        int count = 0;
+        for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
+            if (entry.getValue().equals(streamId)) {
+                entry.getKey().onTick(streamIdTimeClockMap.get(streamId), getCurrentSystemTime());
+                count++;
+            }
+        }
+        if (LOG.isDebugEnabled()) LOG.debug("Triggered {} time-clock listeners on stream {}", count, streamId);
+    }
+
+    private static long getCurrentSystemTime(){
+        return System.currentTimeMillis();
+    }
+
+    @Override
+    public void onTimeUpdate(String streamId, long timestamp) {
+        StreamTimeClock timeClock = getStreamTimeClock(streamId);
+        if(timeClock == null)
+            return;
+        // Trigger time clock only when time moves forward
+        if(timestamp >= timeClock.getTime()) {
+            timeClock.moveForward(timestamp);
+            if(LOG.isDebugEnabled()) LOG.debug("Tick on stream {} with latest time {}",streamId, DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime()));
+            triggerTickOn(streamId);
+        }
+    }
+
+    private void triggerTickOnAll(){
+        synchronized (listenerStreamIdMap) {
+            for (Map.Entry<StreamTimeClockListener, String> entry : listenerStreamIdMap.entrySet()) {
+                triggerTickOn(entry.getValue());
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        timer.cancel();
+        triggerTickOnAll();
+        LOG.info("Closed StreamTimeClockManager {}",this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
new file mode 100644
index 0000000..4e1212f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
+import org.apache.eagle.alert.engine.sorter.StreamWindow;
+import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
+import org.apache.eagle.alert.engine.sorter.StreamWindowRepository;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamWindowManagerImpl implements StreamWindowManager {
+    private final static Logger LOG = LoggerFactory.getLogger(StreamWindowManagerImpl.class);
+    private final TreeMap<Long,StreamWindow> windowBuckets;
+    private final PartitionedEventCollector collector;
+    private final Period windowPeriod;
+    private final long windowMargin;
+    @SuppressWarnings("unused")
+    private final Comparator<PartitionedEvent> comparator;
+    private long rejectTime;
+
+    public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector){
+        this.windowBuckets = new TreeMap<>();
+        this.windowPeriod = windowPeriod;
+        this.windowMargin = windowMargin;
+        this.collector = collector;
+        this.comparator = comparator;
+    }
+
+    @Override
+    public StreamWindow addNewWindow(long initialTime) {
+        synchronized (windowBuckets) {
+            if (!reject(initialTime)) {
+                Long windowStartTime = TimePeriodUtils.formatMillisecondsByPeriod(initialTime, windowPeriod);
+                Long windowEndTime = windowStartTime + TimePeriodUtils.getMillisecondsOfPeriod(windowPeriod);
+                StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(windowStartTime, windowEndTime, windowMargin);
+                window.register(collector);
+                addWindow(window);
+                return window;
+            } else {
+                throw new IllegalStateException("Failed to create new window, as " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is too late, only allow timestamp after " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime));
+            }
+        }
+    }
+
+    private void addWindow(StreamWindow window){
+        if (!windowBuckets.containsKey(window.startTime())) {
+            windowBuckets.put(window.startTime(), window);
+        } else {
+            throw new IllegalArgumentException("Duplicated " + window.toString());
+        }
+    }
+
+    @Override
+    public void removeWindow(StreamWindow window) {
+        synchronized (windowBuckets) {
+            windowBuckets.remove(window.startTime());
+        }
+    }
+
+    @Override
+    public boolean hasWindow(StreamWindow window) {
+        synchronized (windowBuckets) {
+            return windowBuckets.containsKey(window.startTime());
+        }
+    }
+
+    @Override
+    public boolean hasWindowFor(long timestamp) {
+        return getWindowFor(timestamp) != null;
+    }
+
+    @Override
+    public Collection<StreamWindow> getWindows() {
+        synchronized (windowBuckets) {
+            return windowBuckets.values();
+        }
+    }
+
+    @Override
+    public StreamWindow getWindowFor(long timestamp) {
+        synchronized (windowBuckets) {
+            for (StreamWindow windowBucket : windowBuckets.values()) {
+                if (timestamp >= windowBucket.startTime() && timestamp < windowBucket.endTime()) {
+                    return windowBucket;
+                }
+            }
+            return null;
+        }
+    }
+
+    @Override
+    public boolean reject(long timestamp) {
+        return timestamp < rejectTime;
+    }
+
+    @Override
+    public void onTick(StreamTimeClock clock,long globalSystemTime) {
+        synchronized (windowBuckets) {
+            List<StreamWindow> toRemoved = new ArrayList<>();
+            List<StreamWindow> aliveWindow = new ArrayList<>();
+
+            for (StreamWindow windowBucket : windowBuckets.values()) {
+                windowBucket.onTick(clock, globalSystemTime);
+                if (windowBucket.rejectTime() > rejectTime) rejectTime = windowBucket.rejectTime();
+            }
+            for (StreamWindow windowBucket : windowBuckets.values()) {
+                if (windowBucket.expired() || windowBucket.endTime() <=rejectTime) {
+                    toRemoved.add(windowBucket);
+                } else {
+                    aliveWindow.add(windowBucket);
+                }
+            }
+            toRemoved.forEach(this::CloseAndRemoveWindow);
+            if (toRemoved.size() > 0) LOG.info("Windows: {} alive = {}, {} expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved);
+        }
+    }
+
+    private void CloseAndRemoveWindow(StreamWindow windowBucket){
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        CloseWindow(windowBucket);
+        removeWindow(windowBucket);
+        stopWatch.stop();
+        LOG.info("Removed {} in {} ms",windowBucket,stopWatch.getTime());
+    }
+
+    private void CloseWindow(StreamWindow windowBucket){
+        windowBucket.close();
+    }
+
+    public void close() {
+        synchronized (windowBuckets) {
+            LOG.debug("Closing");
+            StopWatch stopWatch = new StopWatch();
+            stopWatch.start();
+            int count = 0;
+            for (StreamWindow windowBucket : getWindows()) {
+                count++;
+                CloseWindow(windowBucket);
+            }
+            windowBuckets.clear();
+            stopWatch.stop();
+            LOG.info("Closed {} windows in {} ms", count, stopWatch.getTime());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
new file mode 100644
index 0000000..cd23405
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -0,0 +1,380 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.spout;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.coordinator.MetadataType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.router.SpoutSpecListener;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializers;
+import org.apache.eagle.alert.utils.AlertConstants;
+import org.apache.eagle.alert.utils.StreamIdConversion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpoutMetric;
+import storm.kafka.KafkaSpoutWrapper;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+
+import com.typesafe.config.Config;
+
+/**
+ * wrap KafkaSpout to provide parallel processing of messages for multiple Kafka topics
+ *
+ * 1. onNewConfig() is interface for outside to update new metadata. Upon new metadata, this class will calculate if there is any new topic, removed topic or
+ *    updated topic
+ *
+ */
+public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener,SerializationMetadataProvider {
+    private static final long serialVersionUID = -5280723341236671580L;
+    private static final Logger LOG  = LoggerFactory.getLogger(CorrelationSpout.class);
+
+    public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = "/consumers";
+    public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer";
+
+    // topic to KafkaSpoutWrapper
+    private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new HashMap<>();
+    private int numOfRouterBolts;
+
+    private SpoutSpec cachedSpoutSpec;
+
+    private transient KafkaSpoutMetric kafkaSpoutMetric;
+
+    @SuppressWarnings("rawtypes")
+    private Map conf;
+    private TopologyContext context;
+    private SpoutOutputCollector collector;
+    private final Config config;
+    private String topologyId;
+    private String spoutName;
+    private String routeBoltName;
+    @SuppressWarnings("unused")
+    private int taskIndex;
+    private IMetadataChangeNotifyService changeNotifyService;
+    private PartitionedEventSerializer serializer;
+    private volatile Map<String, StreamDefinition> sds;
+
+    /**
+     * FIXME one single changeNotifyService may have issues as possibly multiple spout tasks will register themselves and initialize service
+     * @param config
+     * @param topologyId
+     * @param changeNotifyService
+     * @param numOfRouterBolts
+     */
+    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts){
+        this(config, topologyId, changeNotifyService, numOfRouterBolts, AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME);
+    }
+    /**
+     *
+     * @param config
+     * @param topologyId used for distinguishing kafka offset for different topologies
+     * @param numOfRouterBolts used for generating streamId and routing
+     * @param spoutName used for generating streamId between spout and router bolt
+     * @param routerBoltName used for generating streamId between spout and router bolt
+     */
+    public CorrelationSpout(Config config, String topologyId, IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String spoutName, String routerBoltName){
+        this.config = config;
+        this.topologyId = topologyId;
+        this.changeNotifyService = changeNotifyService;
+        this.numOfRouterBolts = numOfRouterBolts;
+        this.spoutName = spoutName;
+        this.routeBoltName = routerBoltName;
+    }
+
+    public String getSpoutName(){
+        return spoutName;
+    }
+
+    public String getRouteBoltName(){
+        return routeBoltName;
+    }
+
+    /**
+     * the only output field is for StreamEvent
+     * @param declarer
+     */
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (int i = 0; i < numOfRouterBolts; i++) {
+            String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i);
+            declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0));
+            LOG.info("declare stream between spout and streamRouterBolt " + streamId);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("open method invoked");
+        }
+        this.conf = conf;
+        this.context = context;
+        this.collector = collector;
+        this.taskIndex = context.getThisTaskIndex();
+
+        // initialize an empty SpoutSpec
+        cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new HashMap<>(), new HashMap<>());
+
+        changeNotifyService.registerListener(this);
+        changeNotifyService.init(config, MetadataType.SPOUT);
+
+        // register KafkaSpout metric
+        kafkaSpoutMetric = new KafkaSpoutMetric();
+        context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60);
+
+        this.serializer = Serializers.newPartitionedEventSerializer(this);
+    }
+
+    @Override
+    public void onSpoutSpecChange(SpoutSpec spec, Map<String, StreamDefinition> sds) {
+        LOG.info("new metadata is updated " + spec);
+        try{
+            onReload(spec, sds);
+        }catch(Exception ex){
+            LOG.error("error applying new SpoutSpec", ex);
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
+            wrapper.nextTuple();
+        }
+    }
+
+    /**
+     * find the correct wrapper to do ack that means msgId should be mapped to
+     * wrapper
+     *
+     * @param msgId
+     */
+    @Override
+    public void ack(Object msgId) {
+        // decode and get topic
+        KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
+        KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
+        spout.ack(id.id);
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        // decode and get topic
+        KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
+        LOG.error("Failing message {}, with topic {}", msgId, id.topic);
+        KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
+        spout.fail(id.id);
+    }
+
+    @Override
+    public void deactivate() {
+        System.out.println("deactivate");
+        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
+            wrapper.deactivate();
+        }
+    }
+
+    @Override
+    public void close() {
+        System.out.println("close");
+        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
+            wrapper.close();
+        }
+    }
+    
+    private List<String> getTopics(SpoutSpec spoutSpec) {
+        List<String> meta = new ArrayList<String>();
+        for (Kafka2TupleMetadata entry : spoutSpec.getKafka2TupleMetadataMap().values()) {
+            meta.add(entry.getTopic());
+        }
+        return meta;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void onReload(final SpoutSpec newMeta, Map<String, StreamDefinition> sds) throws Exception {
+        // calculate topic create/remove/update
+        List<String> topics = getTopics(newMeta);
+        List<String> cachedTopcies = getTopics(cachedSpoutSpec);
+        Collection<String> newTopics = CollectionUtils.subtract(topics, cachedTopcies);
+        Collection<String> removeTopics = CollectionUtils.subtract(cachedTopcies, topics);
+        Collection<String> updateTopics = CollectionUtils.intersection(topics, cachedTopcies);
+
+        LOG.info("Topics were added={}, removed={}, modified={}", newTopics, removeTopics, updateTopics);
+
+        // build lookup table for scheme
+        Map<String, String> newSchemaName = new HashMap<String, String>();
+        for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) {
+            newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
+        }
+
+        // copy and swap
+        Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new HashMap<>(this.kafkaSpoutList);
+        // iterate new topics and then create KafkaSpout
+        for(String topic : newTopics){
+            KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
+            if (wrapper != null) {
+                LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic));
+                continue;
+            }
+            KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds);
+            newKafkaSpoutList.put(topic, newWrapper);
+        }
+        // iterate remove topics and then close KafkaSpout
+        for(String topic : removeTopics){
+            KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
+            if (wrapper == null) {
+                LOG.warn(MessageFormat.format("try to remove topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
+                continue;
+            }
+            removeKafkaSpout(wrapper);
+            newKafkaSpoutList.remove(topic);
+        }
+
+        // iterate update topic and then update metadata
+        for(String topic : updateTopics){
+            KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic);
+            if (spoutWrapper == null) {
+                LOG.warn(MessageFormat.format("try to update topic {0}, but not found in the active spout list, this may indicate some inconsistency", topic));
+                continue;
+            }
+            spoutWrapper.update(newMeta, sds);
+        }
+
+        // swap
+        this.cachedSpoutSpec = newMeta;
+        this.kafkaSpoutList = newKafkaSpoutList;
+        this.sds = sds;
+    }
+
+    /**
+     * make this method protected to make sure unit test can work well
+     * Q: Where to persist consumer state, i.e. what offset has been consumed for each topic and partition
+     * A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + topologyId + "/" + partitionId
+     * Note1: PartitionManager.committedPath for composing zkState path,  _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
+     * consumerId by default is EagleConsumer unless it is specified by "stormKafkaEagleConsumer"
+     * Note2: put topologyId as part of zkState because one topic by design can be consumed by multiple topologies so one topology needs to know
+     * processed offset for itself
+     *
+     * TODO: Should avoid use Config.get in deep calling stack, should generate config bean as early as possible
+     *
+     * @param conf
+     * @param context
+     * @param collector
+     * @param topic
+     * @param spoutSpec
+     * @return
+     */
+    @SuppressWarnings("rawtypes")
+    protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
+                                                 String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{
+        String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum");
+        BrokerHosts hosts = new ZkHosts(kafkaBrokerZkQuorum);
+        String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
+        if(config.hasPath("spout.stormKafkaTransactionZkPath")) {
+            transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath");
+        }
+        // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
+        String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
+        if(config.hasPath("spout.stormKafkaEagleConsumer")){
+            zkStateTransactionRelPath = config.getString("spout.stormKafkaEagleConsumer");
+        }
+        SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
+        // transaction zkServers
+        boolean stormKafkaUseSameZkQuorumWithKafkaBroker = config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
+        if(stormKafkaUseSameZkQuorumWithKafkaBroker){
+            ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum);
+            spoutConfig.zkServers = utils.getZkHosts();
+            spoutConfig.zkPort = utils.getZkPort();
+        }else{
+            ZkServerPortUtils utils = new ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum"));
+            spoutConfig.zkServers = utils.getZkHosts();
+            spoutConfig.zkPort = utils.getZkPort();
+        }
+        // transaction update interval
+        spoutConfig.stateUpdateIntervalMs = config.getLong("spout.stormKafkaStateUpdateIntervalMs");
+        // Kafka fetch size
+        spoutConfig.fetchSizeBytes = config.getInt("spout.stormKafkaFetchSizeBytes");
+        // "startOffsetTime" is for test usage, prod should not use this
+        if (config.hasPath("spout.stormKafkaStartOffsetTime")) {
+            spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime");
+        }
+
+        spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic));
+        KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric);
+        SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds,this.serializer);
+        wrapper.open(conf, context, collectorWrapper);
+        return wrapper;
+    }
+
+    @Override
+    public StreamDefinition getStreamDefinition(String streamId) {
+        return sds.get(streamId);
+    }
+
+    /**
+     * utility to get list of zkServers and zkPort.(It is assumed that zkPort is same for all zkServers as storm-kafka library requires this though it is not efficient)
+     */
+    private static class ZkServerPortUtils{
+        private List<String> zkHosts = new ArrayList<>();
+        private Integer zkPort;
+        public ZkServerPortUtils(String zkQuorum){
+            String[] zkConnections = zkQuorum.split(",");
+            for (String zkConnection : zkConnections) {
+                zkHosts.add(zkConnection.split(":")[0]);
+            }
+            zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+        }
+
+        public List<String> getZkHosts(){
+            return zkHosts;
+        }
+
+        public Integer getZkPort(){
+            return zkPort;
+        }
+    }
+
+    protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
+        try {
+            wrapper.close();
+        } catch (Exception e) {
+            LOG.error("Close wrapper failed. Ignore and continue!", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
new file mode 100644
index 0000000..a2c9219
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.spout;
+
+import java.util.Properties;
+
+import kafka.admin.AdminUtils;
+import kafka.utils.ZKStringSerializer$;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.slf4j.Logger;
+
+/**
+ * normally this is used in unit test for convenience
+ */
+public class CreateTopicUtils {
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class);
+    private static final int partitions = 2;
+    private static final int replicationFactor = 1;
+    public static void ensureTopicReady(String zkQuorum, String topic){
+        ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
+        if(!AdminUtils.topicExists(zkClient, topic)) {
+            LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " + replicationFactor);
+            AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
new file mode 100644
index 0000000..23e94c3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
@@ -0,0 +1,41 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.spout;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * topic to stream metadata lifecycle method
+ * one topic may spawn multiple streams, the metadata change includes
+ * 1. add/remove stream
+ * 2. for a specific stream, groupingstrategy is changed
+ *    ex1, this stream has more alert bolts than before, then this spout would take more traffic
+ *    ex2, this stream has less alert bolts than before, then this spout would take less traffic
+ */
+public interface ISpoutSpecLCM {
+    /**
+     * stream metadata is used for SPOUT to filter traffic and route traffic to following groupby bolts.
+     * @param metadata
+     */
+    void update(SpoutSpec metadata, Map<String, StreamDefinition> sds);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
new file mode 100644
index 0000000..c786c01
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
@@ -0,0 +1,42 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.spout;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Created on 2/18/16.
+ */
+public class KafkaMessageIdWrapper {
+    public Object id;
+    public KafkaMessageIdWrapper(Object o){
+        this.id = o;
+    }
+    public String topic;
+    private final static ObjectMapper objectMapper = new ObjectMapper();
+
+    public String toString(){
+        try {
+            return String.format("KafkaMessageIdWrapper[topic=%s, id=%s]", topic, objectMapper.writeValueAsString(id));
+        } catch (JsonProcessingException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
new file mode 100644
index 0000000..223f1b5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
@@ -0,0 +1,35 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.spout;
+
+import backtype.storm.spout.Scheme;
+
+
+/**
+ * All Scheme implementations should have the following conditions
+ * 1) implement Scheme interface
+ * 2) has one constructor with topic name as parameter
+ */
+public class SchemeBuilder {
+    public static Scheme buildFromClsName(String clsName, String topic) throws Exception{
+        Object o = Class.forName(clsName).getConstructor(String.class).newInstance(topic);
+        return (Scheme)o;
+    }
+}



Mime
View raw message