eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [09/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
Date Wed, 01 Jun 2016 05:56:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
new file mode 100644
index 0000000..a94604c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.serialization.impl.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Serializers {
+    private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
+
+    public static <T> void register(StreamColumn.Type type,Serializer<T> serializer){
+        if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
+            throw new IllegalArgumentException("Duplicated column type: "+type);
+        }
+        COLUMN_TYPE_SER_MAPPING.put(type,serializer);
+    }
+
+    public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){
+        if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
+            return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
+        }else{
+            throw new IllegalArgumentException("Serializer of type: "+type+" not found");
+        }
+    }
+
+    public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider){
+        return new PartitionedEventSerializerImpl(metadataProvider);
+    }
+
+    static {
+        register(StreamColumn.Type.STRING,new StringSerializer());
+        register(StreamColumn.Type.INT,new IntegerSerializer());
+        register(StreamColumn.Type.LONG,new LongSerializer());
+        register(StreamColumn.Type.FLOAT,new FloatSerializer());
+        register(StreamColumn.Type.DOUBLE,new DoubleSerializer());
+        register(StreamColumn.Type.BOOL,new BooleanSerializer());
+        register(StreamColumn.Type.OBJECT,new JavaObjectSerializer());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
new file mode 100644
index 0000000..1e90569
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class BooleanSerializer implements Serializer<Boolean> {
+    @Override
+    public void serialize(Boolean value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeBoolean(value);
+    }
+
+    @Override
+    public Boolean deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readBoolean();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
new file mode 100644
index 0000000..ad5f53c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class DoubleSerializer implements Serializer<Double> {
+    @Override
+    public void serialize(Double value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeDouble(value);
+    }
+
+    @Override
+    public Double deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readDouble();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
new file mode 100644
index 0000000..18089a9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class FloatSerializer implements Serializer<Float> {
+    @Override
+    public void serialize(Float value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeFloat(value);
+    }
+
+    @Override
+    public Float deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readFloat();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
new file mode 100644
index 0000000..d2473a9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class IntegerSerializer implements Serializer<Integer> {
+    @Override
+    public void serialize(Integer value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeInt(value);
+    }
+
+    @Override
+    public Integer deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readInt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
new file mode 100644
index 0000000..76d2294
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class JavaObjectSerializer implements Serializer<Object> {
+    @Override
+    public void serialize(Object value, DataOutput dataOutput) throws IOException {
+        byte[] bytes = SerializationUtils.serialize((Serializable) value);
+        dataOutput.writeInt(bytes.length);
+        dataOutput.write(bytes);
+    }
+
+    @Override
+    public Object deserialize(DataInput dataInput) throws IOException {
+        int len = dataInput.readInt();
+        byte[] bytes = new byte[len];
+        dataInput.readFully(bytes);
+        return SerializationUtils.deserialize(bytes);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
new file mode 100644
index 0000000..8d85c76
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class LongSerializer implements Serializer<Long> {
+    @Override
+    public void serialize(Long value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(value);
+    }
+
+    @Override
+    public Long deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readLong();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
new file mode 100644
index 0000000..714920e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+import org.apache.eagle.alert.engine.utils.CompressionUtils;
+
+import java.io.*;
+
+/**
+ * Stream Metadata Cached Serializer
+ *
+ * Performance:
+ *
+ * 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms).
+ * 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms)
+ *
+ * Tips:
+ *
+ * 1) Without-compression performs better than with compression for small event
+ *
+ * TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?
+ *
+ * @see PartitionedEvent
+ */
+public class PartitionedEventSerializerImpl implements Serializer<PartitionedEvent>, PartitionedEventSerializer {
+    private final StreamEventSerializer streamEventSerializer;
+    private final Serializer<StreamPartition> streamPartitionSerializer;
+    private final boolean compress;
+
+    /**
+     * @param serializationMetadataProvider metadata provider
+     * @param compress false by default
+     */
+    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider,boolean compress) {
+        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
+        this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
+        this.compress = compress;
+    }
+
+    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider) {
+        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
+        this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
+        this.compress = false;
+    }
+
+    @Override
+    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(entity.getPartitionKey());
+        streamEventSerializer.serialize(entity.getEvent(), dataOutput);
+        streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
+    }
+
+    @Override
+    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
+        PartitionedEvent event = new PartitionedEvent();
+        event.setPartitionKey(dataInput.readLong());
+        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
+        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
+        event.setEvent(streamEvent);
+        partition.setStreamId(streamEvent.getStreamId());
+        event.setPartition(partition);
+        return event;
+    }
+
+    @Override
+    public byte[] serialize(PartitionedEvent entity) throws IOException {
+        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
+        this.serialize(entity,dataOutput);
+        return compress ? CompressionUtils.compress(dataOutput.toByteArray()):dataOutput.toByteArray();
+    }
+
+    @Override
+    public PartitionedEvent deserialize(byte[] bytes) throws IOException {
+        return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes):bytes));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
new file mode 100644
index 0000000..d166ced
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -0,0 +1,87 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+import org.apache.eagle.alert.engine.serialization.Serializers;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
+/**
+ * @see StreamEvent
+ */
+public class StreamEventSerializer implements Serializer<StreamEvent> {
+    private final SerializationMetadataProvider serializationMetadataProvider;
+
+    public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider){
+        this.serializationMetadataProvider = serializationMetadataProvider;
+    }
+
+    /**
+     *
+     * @param objects
+     * @return
+     */
+    private BitSet isNullBitSet(Object[] objects){
+        BitSet bitSet = new BitSet();
+        int i = 0;
+        for(Object obj:objects){
+            bitSet.set(i,obj == null);
+            i++;
+        }
+        return bitSet;
+    }
+
+    @Override
+    public void serialize(StreamEvent event, DataOutput dataOutput) throws IOException {
+        dataOutput.writeUTF(event.getStreamId());
+        dataOutput.writeLong(event.getTimestamp());
+        if(event.getData() == null || event.getData().length == 0){
+            dataOutput.writeInt(0);
+        }else{
+            BitSet isNullIndex = isNullBitSet(event.getData());
+            byte[] isNullBytes = isNullIndex.toByteArray();
+            dataOutput.writeInt(isNullBytes.length);
+            dataOutput.write(isNullBytes);
+            int i =0;
+            StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
+            if(definition == null) throw new IOException("StreamDefinition not found: "+event.getStreamId());
+            if(event.getData().length != definition.getColumns().size()){
+                throw new IOException("Event :"+event+" doesn't match with schema: "+definition);
+            }
+            for(StreamColumn column:definition.getColumns()){
+                if(!isNullIndex.get(i)) {
+                    Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i],dataOutput);
+                }
+                i ++;
+            }
+        }
+    }
+
+    @Override
+    public StreamEvent deserialize(DataInput dataInput) throws IOException {
+        StreamEvent event = new StreamEvent();
+        event.setStreamId(dataInput.readUTF());
+        StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
+        event.setTimestamp(dataInput.readLong());
+        int isNullBytesLen = dataInput.readInt();
+        byte[] isNullBytes = new byte[isNullBytesLen];
+        dataInput.readFully(isNullBytes);
+        BitSet isNullIndex = BitSet.valueOf(isNullBytes);
+        Object[] attributes = new Object[definition.getColumns().size()];
+        int i = 0;
+        for(StreamColumn column:definition.getColumns()){
+            if(!isNullIndex.get(i)) {
+                attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput);
+            }
+            i ++;
+        }
+        event.setData(attributes);
+        return event;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
new file mode 100644
index 0000000..4aa25b1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
@@ -0,0 +1,83 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+
+/**
+ * Don't serialize streamId
+ *
+ * @see StreamPartition
+ */
+public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> {
+    public final static StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
+
+    private final Map<DigestBytes,StreamPartition> checkSumPartitionMap = new HashMap<>();
+    private final Map<StreamPartition,DigestBytes> partitionCheckSumMap = new HashMap<>();
+
+    @Override
+    public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
+        DigestBytes checkSum = partitionCheckSumMap.get(partition);
+        if(checkSum == null){
+            try {
+                checkSum = digestCheckSum(partition);
+                partitionCheckSumMap.put(partition,checkSum);
+                checkSumPartitionMap.put(checkSum,partition);
+            } catch (NoSuchAlgorithmException e) {
+                throw new IOException(e);
+            }
+        }
+        dataOutput.writeInt(checkSum.size());
+        dataOutput.write(checkSum.toByteArray());
+    }
+
+    @Override
+    public StreamPartition deserialize(DataInput dataInput) throws IOException {
+        int checkSumLen = dataInput.readInt();
+        byte[] checksum = new byte[checkSumLen];
+        dataInput.readFully(checksum);
+        StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum));
+        if(partition == null){
+            throw new IOException("Illegal partition checksum: "+checksum);
+        }
+        return partition;
+    }
+
+    private class DigestBytes {
+        private final byte[] data;
+
+        public DigestBytes(byte[] bytes){
+            this.data = bytes;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return other instanceof DigestBytes && Arrays.equals(data, ((DigestBytes) other).data);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(data);
+        }
+        public int size(){
+            return data.length;
+        }
+        public byte[] toByteArray(){
+            return data;
+        }
+    }
+
+    private DigestBytes digestCheckSum(Object obj) throws IOException, NoSuchAlgorithmException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(obj);
+        oos.close();
+        MessageDigest m = MessageDigest.getInstance("SHA1");
+        m.update(baos.toByteArray());
+        return new DigestBytes(m.digest());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
new file mode 100644
index 0000000..b92c301
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
@@ -0,0 +1,64 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Don't serialize streamId
+ *
+ * @see StreamPartition
+ */
+public class StreamPartitionSerializer implements Serializer<StreamPartition> {
+    public final static StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
+
+    @Override
+    public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
+        dataOutput.writeUTF(partition.getType().toString());
+        if(partition.getColumns() == null || partition.getColumns().size() == 0){
+            dataOutput.writeInt(0);
+        } else {
+            dataOutput.writeInt(partition.getColumns().size());
+            for(String column:partition.getColumns()){
+                dataOutput.writeUTF(column);
+            }
+        }
+        if(partition.getSortSpec() == null){
+            dataOutput.writeByte(0);
+        }else {
+            dataOutput.writeByte(1);
+            dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod());
+            dataOutput.writeInt(partition.getSortSpec().getWindowMargin());
+        }
+    }
+
+    @Override
+    public StreamPartition deserialize(DataInput dataInput) throws IOException {
+        StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.locate(dataInput.readUTF()));
+        int colSize = dataInput.readInt();
+        if(colSize>0){
+            List<String> columns = new ArrayList<>(colSize);
+            for(int i=0;i<colSize;i++){
+                columns.add(dataInput.readUTF());
+            }
+            partition.setColumns(columns);
+        }
+        if(dataInput.readByte() == 1){
+            String period = dataInput.readUTF();
+            int margin = dataInput.readInt();
+
+            StreamSortSpec sortSpec = new StreamSortSpec();
+            sortSpec.setWindowPeriod(period);
+            sortSpec.setWindowMargin(margin);
+            partition.setSortSpec(sortSpec);
+        }
+        return partition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
new file mode 100644
index 0000000..2a1541a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StringSerializer implements Serializer<String> {
+    @Override
+    public void serialize(String value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeUTF(value);
+    }
+
+    @Override
+    public String deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readUTF();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
new file mode 100644
index 0000000..b705564
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
@@ -0,0 +1,122 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.siddhi.extension;
+
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * @since Apr 1, 2016
+ *
+ */
+public class AttributeCollectAggregator extends AttributeAggregator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
+
+    private LinkedList<Object> value;
+
+    public AttributeCollectAggregator() {
+        value = new LinkedList<Object>();
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public Object[] currentState() {
+        return value.toArray();
+    }
+
+    @Override
+    public void restoreState(Object[] arg0) {
+        value = new LinkedList<Object>();
+        if (arg0 != null) {
+            for (Object o : arg0) {
+                value.add(o);
+            }
+        }
+    }
+
+    @Override
+    public Type getReturnType() {
+        return Attribute.Type.OBJECT;
+    }
+
+    @Override
+    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
+        // TODO: Support max of elements?
+    }
+
+    @Override
+    public Object processAdd(Object arg0) {
+        value.add(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    @Override
+    public Object processAdd(Object[] arg0) {
+        value.add(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object arg0) {
+        value.remove(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processRemove: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object[] arg0) {
+        value.remove(arg0);
+        LOG.info("processRemove: current values are : " + value);
+        return ImmutableList.copyOf(value);
+    }
+
+    @Override
+    public Object reset() {
+        value.clear();
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
new file mode 100644
index 0000000..38c5c30
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: Make sure thread-safe
+ * TODO: Leverage Off-Heap Memory to persist append-only events collection
+ */
+public abstract class BaseStreamWindow implements StreamWindow {
+    private final long endTime;
+    private final long startTime;
+    private final long margin;
+    private final AtomicBoolean expired;
+    private final long createdTime;
+    private final static Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
+    private PartitionedEventCollector collector;
+    private final AtomicLong lastFlushedStreamTime;
+    private final AtomicLong lastFlushedSystemTime;
+
+    /**
+     * @param startTime
+     * @param endTime
+     * @param marginTime
+     */
+    public BaseStreamWindow(long startTime, long endTime, long marginTime){
+        if(startTime >= endTime){
+            throw new IllegalArgumentException("startTime: "+startTime+" >= endTime: "+endTime+", expected: startTime < endTime");
+        }
+        if(marginTime > endTime - startTime){
+            throw new IllegalArgumentException("marginTime: "+marginTime+" > endTime: "+endTime+" - startTime "+startTime+", expected: marginTime < endTime - startTime");
+        }
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.margin = marginTime;
+        this.expired = new AtomicBoolean(false);
+        this.createdTime = System.currentTimeMillis();
+        this.lastFlushedStreamTime = new AtomicLong(0);
+        this.lastFlushedSystemTime = new AtomicLong(this.createdTime);
+    }
+
+    @Override
+    public void register(PartitionedEventCollector collector) {
+        if(this.collector!=null)
+            throw new IllegalArgumentException("Duplicated collector error");
+        this.collector = collector;
+    }
+
+    @Override
+    public long createdTime() {
+        return createdTime;
+    }
+
+    public long startTime() {
+        return this.startTime;
+    }
+
+    @Override
+    public long rejectTime(){
+        return this.lastFlushedStreamTime.get();
+    }
+
+    @Override
+    public long margin() {
+        return this.margin;
+    }
+
+    public long endTime() {
+        return this.endTime;
+    }
+
+    public boolean accept(final long eventTime) {
+        return !expired() && eventTime >= startTime && eventTime < endTime
+                && eventTime >= lastFlushedStreamTime.get(); // dropped
+    }
+
+    public boolean expired() {
+        return expired.get();
+    }
+
+    @Override
+    public boolean alive() {
+        return !expired.get();
+    }
+
+    /**
+     *
+     *
+     * Expire when
+     * 1) If stream time >= endTime + marginTime, then flush and expire
+     * 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
+     * 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
+     * 4) else do nothing
+     *
+     *  @param clock stream time clock
+     *  @param globalSystemTime system time clock
+     */
+    @Override
+    public synchronized void onTick(StreamTimeClock clock,long globalSystemTime) {
+        if(!expired()) {
+            if(clock.getTime() >= endTime + margin){
+                LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+                lastFlushedStreamTime.set(clock.getTime());
+                lastFlushedSystemTime.set(globalSystemTime);
+                flush();
+                expired.set(true);
+            } else if(globalSystemTime - lastFlushedSystemTime.get() >=  endTime + margin - startTime && size() > 0){
+                LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+                lastFlushedStreamTime.set(clock.getTime());
+                lastFlushedSystemTime.set(globalSystemTime);
+                flush();
+                if(lastFlushedStreamTime.get()>=this.endTime) expired.set(true);
+            }
+        } else {
+            LOG.warn("Window has already expired, should not tick: {}",this.toString());
+        }
+    }
+
+    public void close() {
+        flush();
+        expired.set(true);
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(startTime).append(endTime).append(margin).build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if(obj !=null && obj instanceof BaseStreamWindow) {
+            BaseStreamWindow another = (BaseStreamWindow) obj;
+            return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
+        }
+        return false;
+    }
+
+    @Override
+    public void flush(){
+        if(this.collector == null) throw new NullPointerException("Collector is not given before window flush");
+        this.flush(collector);
+    }
+
+    /**
+     *
+     * @param collector
+     * @return max timestamp
+     */
+    protected abstract void flush(PartitionedEventCollector collector);
+
+    @Override
+    public String toString() {
+        return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
+                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
+                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
+                this.margin,
+                size(),
+                this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime): DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
+                );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
new file mode 100644
index 0000000..0881e35
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+/**
+ * The time clock per stream
+ *
+ * Should be thread-safe between getTime and moveForward
+ *
+ * By default, we currently simple support event timestamp now
+ */
+public interface StreamTimeClock {
+    /**
+     * Get stream id
+     *
+     * @return stream id
+     */
+    String getStreamId();
+
+    /**
+     * Get current time
+     *
+     * @return current timestamp value
+     */
+    long getTime();
+
+    /**
+     * @param timestamp move forward current time to given timestamp
+     */
+    void moveForward(long timestamp);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
new file mode 100644
index 0000000..f7e463d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+public interface StreamTimeClockListener {
+    /**
+     * @see StreamWindow
+     *
+     * @param streamTime
+     * @param globalSystemTime
+     */
+    void onTick(StreamTimeClock streamTime, long globalSystemTime);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
new file mode 100644
index 0000000..29b13eb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.io.Serializable;
+
+/**
+ *
+ * By default, we could keep the current time clock in memory,
+ * Eventually we may need to consider the global time synchronization across all nodes
+ *
+ * TODO: maybe need to synchronize time clock globally
+ *
+ * 1) When to initialize window according to start time
+ * 2) When to close expired window according to current time
+ * 3) Automatically tick periodically as the single place for control lock
+ *
+ */
+public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable{
+    /**
+     * @return StreamTimeClock instance
+     */
+    StreamTimeClock createStreamTimeClock(String streamId);
+
+    StreamTimeClock getStreamTimeClock(String streamId);
+
+    /**
+     * @param streamId
+     */
+    void removeStreamTimeClock(String streamId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
new file mode 100644
index 0000000..a0cd184
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+
+/**
+ * Possible implementation:
+ *
+ * 1) EventTimeClockTrigger (by default)
+ * 2) SystemTimeClockTrigger
+ */
+public interface StreamTimeClockTrigger {
+    /**
+     * @param streamId stream id to listen to
+     * @param listener to watch on streamId
+     */
+    void registerListener(String streamId, StreamTimeClockListener listener);
+
+    /**
+     *
+     * @param streamClock
+     * @param listener
+     */
+    void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener);
+
+    /**
+     * @param listener listener to remove
+     */
+    void removeListener(StreamTimeClockListener listener);
+
+    /**
+     * Trigger tick of all listeners on certain stream
+     *
+     * @param streamId stream id
+     */
+    void triggerTickOn(String streamId);
+
+    /**
+     * Update time per new event time on stream
+     *
+     * @param streamId
+     * @param timestamp
+     */
+    void onTimeUpdate(String streamId, long timestamp);
+
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
new file mode 100644
index 0000000..2500122
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * <h2>Tumbling Window instead Sliding Window</h2>
+ * We could have time overlap to sort out-of-ordered stream,
+ * but each window should never have events overlap, otherwise will have logic problem.
+ *
+ *
+ * <h2>Ingestion Time Policy</h2>
+ * Different notions of time, namely processing time, event time, and ingestion time.
+ *
+ * <ol>
+ * <li>
+ *  In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window, i.e., a one minute processing time window collects elements for exactly one minute.
+ * </li>
+ * <li>
+ * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics from the actual serving speed of the source and the processing performance of system. Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program. It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery. Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common if a data stream gathers events from distributed sources.
+ * </li>
+ * <li>
+ * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive in the system (at the source) and continues processing with event time semantics based on the attached timestamps.
+ * </li>
+ * </ol>
+ */
+public interface StreamWindow extends StreamTimeClockListener {
+    /**
+     * @return Created timestamp
+     */
+    long createdTime();
+
+    /**
+     * Get start time
+     *
+     * @return
+     */
+    long startTime();
+
+    long margin();
+
+    /**
+     * @return reject timestamp < rejectTime()
+     */
+    long rejectTime();
+
+    /**
+     * Get end time
+     *
+     * @return
+     */
+    long endTime();
+
+    /**
+     * @param timestamp event time
+     * @return true/false in boolean
+     */
+    boolean accept(long timestamp);
+
+    /**
+     * Window is expired
+     *
+     * @return whether window is expired
+     */
+    boolean expired();
+
+    /**
+     * @return whether window is alive
+     */
+    boolean alive();
+
+    /**
+     *
+     * @param event
+     */
+    boolean add(PartitionedEvent event);
+
+//    /**
+//     * @param collector Drain to output collector
+//     */
+//    void flush(PartitionedEventCollector collector);
+
+    void flush();
+
+    /**
+     * Close window
+     */
+    void close();
+
+    void register(PartitionedEventCollector collector);
+
+    /**
+     *
+     * @return
+     */
+    int size();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
new file mode 100644
index 0000000..a5f43aa
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowManager.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.io.Closeable;
+import java.util.Collection;
+
+/**
+ * TODO: Reuse existing expired window to avoid recreating new windows again and again
+ *
+ * Single stream window manager
+ */
+public interface StreamWindowManager extends StreamTimeClockListener, Closeable {
+
+    /**
+     * @param initialTime
+     * @return
+     */
+    StreamWindow addNewWindow(long initialTime);
+
+    /**
+     * @param window
+     */
+    void removeWindow(StreamWindow window);
+
+    /**
+     * @param window
+     * @return
+     */
+    boolean hasWindow(StreamWindow window);
+
+    /**
+     * @param timestamp time
+     * @return whether window exists for time
+     */
+    boolean hasWindowFor(long timestamp);
+
+    /**
+     * @return Internal collection for performance optimization
+     */
+    Collection<StreamWindow> getWindows();
+
+    /**
+     * @param timestamp
+     * @return
+     */
+    StreamWindow getWindowFor(long timestamp);
+
+    boolean reject(long timestamp);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
new file mode 100755
index 0000000..3e035c5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindowRepository.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowInMapDB;
+import org.apache.eagle.alert.engine.sorter.impl.StreamSortedWindowOnHeap;
+import org.mapdb.DB;
+import org.mapdb.DBMaker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ * ===== Benchmark Result Report =====<br/><br/>
+ *
+ * Num. Operation   Type                            Time<br/>
+ * ---- ---------   ----                            ----<br/>
+ * 1000	FlushTime	DIRECT_MEMORY            	:	55<br/>
+ * 1000	FlushTime	FILE_RAF                 	:	63<br/>
+ * 1000	FlushTime	MEMORY                   	:	146<br/>
+ * 1000	FlushTime	ONHEAP                   	:	17<br/>
+ * 1000	InsertTime	DIRECT_MEMORY           	:	68<br/>
+ * 1000	InsertTime	FILE_RAF                	:	223<br/>
+ * 1000	InsertTime	MEMORY                  	:	273<br/>
+ * 1000	InsertTime	ONHEAP                  	:	20<br/>
+ * 10000	FlushTime	DIRECT_MEMORY           	:	551<br/>
+ * 10000	FlushTime	FILE_RAF                	:	668<br/>
+ * 10000	FlushTime	MEMORY                  	:	643<br/>
+ * 10000	FlushTime	ONHEAP                  	:	5<br/>
+ * 10000	InsertTime	DIRECT_MEMORY          	:	446<br/>
+ * 10000	InsertTime	FILE_RAF               	:	2095<br/>
+ * 10000	InsertTime	MEMORY                 	:	784<br/>
+ * 10000	InsertTime	ONHEAP                 	:	29<br/>
+ * 100000	FlushTime	DIRECT_MEMORY          	:	6139<br/>
+ * 100000	FlushTime	FILE_RAF               	:	6237<br/>
+ * 100000	FlushTime	MEMORY                 	:	6238<br/>
+ * 100000	FlushTime	ONHEAP                 	:	18<br/>
+ * 100000	InsertTime	DIRECT_MEMORY         	:	4499<br/>
+ * 100000	InsertTime	FILE_RAF              	:	22343<br/>
+ * 100000	InsertTime	MEMORY                	:	4962<br/>
+ * 100000	InsertTime	ONHEAP                	:	107<br/>
+ * 1000000	FlushTime	DIRECT_MEMORY         	:	61356<br/>
+ * 1000000	FlushTime	FILE_RAF              	:	63025<br/>
+ * 1000000	FlushTime	MEMORY                	:	61380<br/>
+ * 1000000	FlushTime	ONHEAP                	:	47<br/>
+ * 1000000	InsertTime	DIRECT_MEMORY        	:	43637<br/>
+ * 1000000	InsertTime	FILE_RAF             	:	464481<br/>
+ * 1000000	InsertTime	MEMORY               	:	44367<br/>
+ * 1000000	InsertTime	ONHEAP               	:	2040<br/>
+ *
+ * @see StreamSortedWindowOnHeap
+ * @see org.mapdb.DBMaker
+ */
+public class StreamWindowRepository {
+    public enum StorageType {
+        /**
+         * Creates new in-memory database which stores all data on heap without serialization.
+         * This mode should be very fast, but data will affect Garbage PartitionedEventCollector the same way as traditional Java Collections.
+         */
+        ONHEAP,
+
+        /**
+         * Creates new in-memory database. Changes are lost after JVM exits.
+         * This option serializes data into {@code byte[]},
+         * so they are not affected by Garbage PartitionedEventCollector.
+         */
+        MEMORY,
+
+        /**
+         * <p>
+         * Creates new in-memory database. Changes are lost after JVM exits.
+         * </p><p>
+         * This will use {@code DirectByteBuffer} outside of HEAP, so Garbage Collector is not affected
+         * You should increase ammount of direct memory with
+         * {@code -XX:MaxDirectMemorySize=10G} JVM param
+         * </p>
+         */
+        DIRECT_MEMORY,
+
+        /**
+         * By default use File.createTempFile("streamwindows","temp")
+         */
+        FILE_RAF
+    }
+
+    private final static Logger LOG = LoggerFactory.getLogger(StreamWindowRepository.class);
+    private final Map<StorageType,DB> dbPool;
+    private StreamWindowRepository(){
+        dbPool = new HashMap<>();
+    }
+
+    private static StreamWindowRepository repository;
+
+    /**
+     * Close automatically when JVM exists
+     *
+     * @return StreamWindowRepository singletonInstance
+     */
+    public static StreamWindowRepository getSingletonInstance(){
+        synchronized (StreamWindowRepository.class) {
+            if (repository == null) {
+                repository = new StreamWindowRepository();
+                Runtime.getRuntime().addShutdownHook(new Thread() {
+                    @Override
+                    public void run() {
+                        repository.close();
+                    }
+                });
+            }
+            return repository;
+        }
+    }
+
+    private DB createMapDB(StorageType storageType){
+        synchronized (dbPool) {
+            if(!dbPool.containsKey(storageType)){
+                DB db;
+                switch (storageType){
+                    case ONHEAP:
+                        db = DBMaker.heapDB().closeOnJvmShutdown().make();
+                        LOG.info("Create ONHEAP mapdb");
+                        break;
+                    case MEMORY:
+                        db = DBMaker.memoryDB().closeOnJvmShutdown().make();
+                        LOG.info("Create MEMORY mapdb");
+                        break;
+                    case DIRECT_MEMORY:
+                        db = DBMaker.memoryDirectDB().closeOnJvmShutdown().make();
+                        LOG.info("Create DIRECT_MEMORY mapdb");
+                        break;
+                    case FILE_RAF:
+                        try {
+                            File file = File.createTempFile("window-", ".map");
+                            file.delete();
+                            file.deleteOnExit();
+                            Preconditions.checkNotNull(file, "file is null");
+                            db = DBMaker.fileDB(file).deleteFilesAfterClose().make();
+                            LOG.info("Created FILE_RAF map file at {}",file.getAbsolutePath());
+                        } catch (IOException e) {
+                            throw new IllegalStateException(e);
+                        }
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Illegal storage type: "+storageType);
+                }
+                dbPool.put(storageType,db);
+                return db;
+            }
+            return dbPool.get(storageType);
+        }
+    }
+
+    public StreamWindow createWindow(long start,long end, long margin, StorageType type){
+        StreamWindow ret;
+        switch (type) {
+            case ONHEAP:
+                ret = new StreamSortedWindowOnHeap(start, end, margin);
+                break;
+            default:
+                ret = new StreamSortedWindowInMapDB(
+                        start,end,margin,
+                        createMapDB(type),
+                        UUID.randomUUID().toString()
+                );
+                break;
+        }
+
+        if(LOG.isDebugEnabled()) LOG.debug("Created new {}, type: {}",ret,type);
+        return ret;
+    }
+
+    public StreamWindow createWindow(long start,long end, long margin, StreamWindowStrategy strategy){
+        return strategy.createWindow(start,end,margin,this);
+    }
+
+    public StreamWindow createWindow(long start,long end, long margin){
+        return OnHeapStrategy.INSTANCE.createWindow(start,end,margin,this);
+    }
+
+    public void close(){
+        for(Map.Entry<StorageType,DB> entry:dbPool.entrySet()){
+            entry.getValue().close();
+        }
+        dbPool.clear();
+    }
+
+    public interface StreamWindowStrategy {
+        /**
+         *
+         * @param start
+         * @param end
+         * @param margin
+         * @return
+         */
+        StreamWindow createWindow(long start,long end, long margin,StreamWindowRepository repository);
+    }
+
+    public static class OnHeapStrategy implements StreamWindowStrategy{
+        public static final OnHeapStrategy INSTANCE = new OnHeapStrategy();
+        @Override
+        public StreamWindow createWindow(long start, long end, long margin,StreamWindowRepository repository) {
+            return repository.createWindow(start,end,margin,StorageType.ONHEAP);
+        }
+    }
+
+    public static class WindowSizeStrategy implements StreamWindowStrategy {
+        private final static long ONE_HOUR = 3600 * 1000;
+        private final static long FIVE_HOURS = 5* 3600 * 1000;
+        private final long onheapWindowSizeLimit;
+        private final long offheapWindowSizeLimit;
+
+        public static WindowSizeStrategy INSTANCE = new WindowSizeStrategy(ONE_HOUR,FIVE_HOURS);
+
+        public WindowSizeStrategy(long onheapWindowSizeLimit, long offheapWindowSizeLimit){
+            this.offheapWindowSizeLimit = offheapWindowSizeLimit;
+            this.onheapWindowSizeLimit = onheapWindowSizeLimit;
+
+            if(this.offheapWindowSizeLimit <this.onheapWindowSizeLimit){
+                throw new IllegalStateException("offheapWindowSizeLimit "+this.offheapWindowSizeLimit +" < onheapWindowSizeLimit "+this.onheapWindowSizeLimit);
+            }
+        }
+
+        @Override
+        public StreamWindow createWindow(long start, long end, long margin,StreamWindowRepository repository) {
+            long windowLength = end - start;
+            if(windowLength <= onheapWindowSizeLimit){
+                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.ONHEAP);
+            }else if(windowLength > onheapWindowSizeLimit & windowLength <= offheapWindowSizeLimit){
+                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.DIRECT_MEMORY);
+            }else {
+                return repository.createWindow(start,end,margin, StreamWindowRepository.StorageType.FILE_RAF);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
new file mode 100644
index 0000000..6cdf8a0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/CachedEventGroupSerializer.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.utils.SerializableUtils;
+import org.jetbrains.annotations.NotNull;
+import org.mapdb.DataInput2;
+import org.mapdb.DataOutput2;
+import org.mapdb.serializer.GroupSerializerObjectArray;
+
+/**
+ * @deprecated performance is worse, should investigate
+ */
+public class CachedEventGroupSerializer extends GroupSerializerObjectArray<PartitionedEvent[]> {
+    private Map<Integer,StreamPartition> hashCodePartitionDict = new HashMap<>();
+    private void writePartitionedEvent(DataOutput2 out,PartitionedEvent event) throws IOException {
+        out.packLong(event.getPartitionKey());
+        int partitionHashCode = 0;
+        if(event.getPartition()!=null) {
+            partitionHashCode = event.getPartition().hashCode();
+            if (!hashCodePartitionDict.containsKey(partitionHashCode)) {
+                hashCodePartitionDict.put(partitionHashCode, event.getPartition());
+            }
+        }
+        out.packInt(partitionHashCode);
+        if(event.getEvent()!=null) {
+            byte[] eventBytes = SerializableUtils.serializeToCompressedByteArray(event.getEvent());
+            out.packInt(eventBytes.length);
+            out.write(eventBytes);
+        } else {
+            out.packInt(0);
+        }
+    }
+
+    private PartitionedEvent readPartitionedEvent(DataInput2 in) throws IOException {
+        PartitionedEvent event = new PartitionedEvent();
+        event.setPartitionKey(in.unpackLong());
+        int partitionHashCode = in.unpackInt();
+        if(partitionHashCode!=0 && hashCodePartitionDict.containsKey(partitionHashCode)) {
+            event.setPartition(hashCodePartitionDict.get(partitionHashCode));
+        }
+        int eventBytesLen = in.unpackInt();
+        if(eventBytesLen > 0) {
+            byte[] eventBytes = new byte[eventBytesLen];
+            in.readFully(eventBytes);
+            event.setEvent((StreamEvent) SerializableUtils.deserializeFromCompressedByteArray(eventBytes, "Deserialize event from bytes"));
+        }
+        return event;
+    }
+
+    @Override
+    public void serialize(DataOutput2 out, PartitionedEvent[] value) throws IOException {
+        out.packInt(value.length);
+        for (PartitionedEvent event : value) {
+            writePartitionedEvent(out,event);
+        }
+    }
+
+    @Override
+    public PartitionedEvent[] deserialize(DataInput2 in, int available) throws IOException {
+        final int size = in.unpackInt();
+        PartitionedEvent[] ret = new PartitionedEvent[size];
+        for (int i = 0; i < size; i++) {
+            ret[i] = readPartitionedEvent(in);
+        }
+        return ret;
+    }
+
+    @Override
+    public boolean isTrusted() {
+        return true;
+    }
+
+    @Override
+    public boolean equals(PartitionedEvent[] a1, PartitionedEvent[] a2) {
+        return a1[0].getTimestamp() == a2[0].getTimestamp();
+    }
+
+    @Override
+    public int hashCode(@NotNull PartitionedEvent[] events, int seed) {
+        return new HashCodeBuilder().append(events).toHashCode();
+    }
+
+    @Override
+    public int compare(PartitionedEvent[] o1, PartitionedEvent[] o2) {
+        if(o1.length>0 && o2.length>0) {
+            return (int) (o1[0].getTimestamp() - o2[0].getTimestamp());
+        }else{
+            return 0;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
new file mode 100644
index 0000000..caa291e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventGroupSerializer.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.utils.SerializableUtils;
+import org.jetbrains.annotations.NotNull;
+import org.mapdb.DataInput2;
+import org.mapdb.DataOutput2;
+import org.mapdb.Serializer;
+import org.mapdb.serializer.GroupSerializer;
+
+
+public class PartitionedEventGroupSerializer implements GroupSerializer<PartitionedEvent[]> {
+    private static final GroupSerializer<byte[]> delegate = Serializer.BYTE_ARRAY;
+
+    private static PartitionedEvent[] deserialize(byte[] bytes){
+        return (PartitionedEvent[]) SerializableUtils.deserializeFromCompressedByteArray(bytes,"deserialize as stream event");
+    }
+
+    private static byte[] serialize(PartitionedEvent[] events){
+        return SerializableUtils.serializeToCompressedByteArray(events);
+    }
+
+    @Override
+    public int valueArraySearch(Object keys, PartitionedEvent[] key) {
+        return delegate.valueArraySearch(keys,serialize(key));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public int valueArraySearch(Object keys, PartitionedEvent[] key, Comparator comparator) {
+        return delegate.valueArraySearch(keys,serialize(key),comparator);
+    }
+
+    @Override
+    public void valueArraySerialize(DataOutput2 out, Object vals) throws IOException {
+        delegate.valueArraySerialize(out,vals);
+    }
+
+    @Override
+    public Object valueArrayDeserialize(DataInput2 in, int size) throws IOException {
+        return delegate.valueArrayDeserialize(in,size);
+    }
+
+    @Override
+    public PartitionedEvent[] valueArrayGet(Object vals, int pos) {
+        return deserialize(delegate.valueArrayGet(vals,pos));
+    }
+
+    @Override
+    public int valueArraySize(Object vals) {
+        return delegate.valueArraySize(vals);
+    }
+
+    @Override
+    public Object valueArrayEmpty() {
+        return delegate.valueArrayEmpty();
+    }
+
+    @Override
+    public Object valueArrayPut(Object vals, int pos, PartitionedEvent[] newValue) {
+        return delegate.valueArrayPut(vals,pos,serialize(newValue));
+    }
+
+    @Override
+    public Object valueArrayUpdateVal(Object vals, int pos, PartitionedEvent[] newValue) {
+        return delegate.valueArrayUpdateVal(vals,pos,serialize(newValue));
+    }
+
+    @Override
+    public Object valueArrayFromArray(Object[] objects) {
+        return delegate.valueArrayFromArray(objects);
+    }
+
+    @Override
+    public Object valueArrayCopyOfRange(Object vals, int from, int to) {
+        return delegate.valueArrayCopyOfRange(vals,from,to);
+    }
+
+    @Override
+    public Object valueArrayDeleteValue(Object vals, int pos) {
+        return delegate.valueArrayDeleteValue(vals,pos);
+    }
+
+    @Override
+    public void serialize(@NotNull DataOutput2 out, @NotNull PartitionedEvent[] value) throws IOException {
+        delegate.serialize(out,serialize(value));
+    }
+
+    @Override
+    public PartitionedEvent[] deserialize(@NotNull DataInput2 input, int available) throws IOException {
+        return deserialize(delegate.deserialize(input,available));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
new file mode 100644
index 0000000..a9f9f39
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter.impl;
+
+import java.util.Comparator;
+import java.util.Objects;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * TODO: Stable sorting algorithm for better performance to avoid event resorting with same timestamp?
+ */
+public class PartitionedEventTimeOrderingComparator implements Comparator<PartitionedEvent> {
+    public static final PartitionedEventTimeOrderingComparator INSTANCE = new PartitionedEventTimeOrderingComparator();
+
+    @Override
+    public int compare(PartitionedEvent o1, PartitionedEvent o2) {
+        if(Objects.equals(o1,o2)){
+            return 0;
+        }else {
+            if(o1 == null && o2 == null){
+                return 0;
+            }else if(o1 != null && o2 == null){
+                return 1;
+            }else if(o1 == null){
+                return -1;
+            }
+            // Unstable Sorting Algorithm
+            if(o1.getTimestamp() <= o2.getTimestamp()){
+                return -1;
+            } else {
+                return 1;
+            }
+        }
+    }
+}
\ No newline at end of file



Mime
View raw message