eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h..@apache.org
Subject [31/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5
Date Thu, 02 Jun 2016 07:08:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
new file mode 100644
index 0000000..a3487d3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Expects flat Json scheme
+ */
+public class JsonScheme implements Scheme {
+    private static final long serialVersionUID = -8352896475656975577L;
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonScheme.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    private String topic;
+
+    public JsonScheme(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields("f1");
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public List<Object> deserialize(byte[] ser) {
+        try {
+            if(ser != null ) {
+                Map map = mapper.readValue(ser, Map.class);
+                return Arrays.asList(topic, map);
+            }else{
+                if(LOG.isDebugEnabled()) LOG.debug("Content is null, ignore");
+            }
+        } catch (IOException e) {
+            try {
+                LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e);
+            }catch(Exception ex){
+                LOG.error(ex.getMessage(), ex);
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
new file mode 100644
index 0000000..1182e3f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java
@@ -0,0 +1,74 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.eagle.alert.coordination.model.StreamNameSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A strategy to get stream name from message tuple.
+ * 
+ * Since 5/5/16.
+ */
+public class JsonStringStreamNameSelector implements StreamNameSelector {
+    private final static Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class);
+    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+    private final static String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName";
+    private final static String STREAM_NAME_FORMAT = "streamNameFormat";
+
+    private String userProvidedStreamName;
+    private String[] fieldNamesToInferStreamName;
+    private String streamNameFormat;
+
+    public JsonStringStreamNameSelector(Properties prop) {
+        userProvidedStreamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
+        String fields = prop.getProperty(FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY);
+        if (fields != null) {
+            fieldNamesToInferStreamName = fields.split(",");
+        }
+        streamNameFormat = prop.getProperty(STREAM_NAME_FORMAT);
+        if (streamNameFormat == null) {
+            LOG.warn("no stream name format found, this might cause default stream name be used which is dis-encouraged. Possibly this is a mis-configuration.");
+        }
+    }
+
+    @Override
+    public String getStreamName(Map<String, Object> tuple) {
+        if (userProvidedStreamName != null) {
+            return userProvidedStreamName;
+        } else if (fieldNamesToInferStreamName != null && streamNameFormat != null) {
+            Object[] args = new Object[fieldNamesToInferStreamName.length];
+            for (int i = 0; i < fieldNamesToInferStreamName.length; i++) {
+                Object colValue = tuple.get(fieldNamesToInferStreamName[i]);
+                args[i] = colValue;
+            }
+            return String.format(streamNameFormat, args);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("can not find the stream name for data source. Use the default stream, possibly this means mis-configuration of datasource!");
+        }
+        return "defaultStringStream";
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
new file mode 100644
index 0000000..89d2e76
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java
@@ -0,0 +1,67 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+/**
+ * used for parsing plain string
+ */
+public class PlainStringScheme implements Scheme {
+    private static final long serialVersionUID = 5969724968671646310L;
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class);
+    private String topic;
+
+    public PlainStringScheme(String topic){
+        this.topic = topic;
+    }
+
+    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
+    public static final String STRING_SCHEME_KEY = "str";
+
+    public static String deserializeString(byte[] buff) {
+        return new String(buff, UTF8_CHARSET);
+    }
+
+    public Fields getOutputFields() {
+        return new Fields(STRING_SCHEME_KEY);
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Override
+    public List<Object> deserialize(byte[] ser) {
+        Map m = new HashMap<>();
+        m.put("value", deserializeString(ser));
+        m.put("timestamp", System.currentTimeMillis());
+        return new Values(topic, m);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
new file mode 100644
index 0000000..61ec943
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java
@@ -0,0 +1,49 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.scheme;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.eagle.alert.coordination.model.StreamNameSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Since 5/3/16.
+ */
+public class PlainStringStreamNameSelector implements StreamNameSelector {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class);
+    private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName";
+    private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream";
+
+    private String streamName;
+
+    public PlainStringStreamNameSelector(Properties prop){
+        streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY);
+        if(streamName == null)
+            streamName = DEFAULT_STRING_STREAM_NAME;
+    }
+    @Override
+    public String getStreamName(Map<String, Object> tuple) {
+        return streamName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
new file mode 100644
index 0000000..5ba1080
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer;
+import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * TODO: Seams the complexity dosen't bring enough performance improve
+ *
+ * @see PartitionedEvent
+ */
+@Deprecated
+public class PartitionedEventDigestSerializer implements Serializer<PartitionedEvent> {
+    private final Serializer<StreamEvent> streamEventSerializer;
+    private final Serializer<StreamPartition> streamPartitionSerializer;
+
+    public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider){
+        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
+        this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE;
+    }
+
+    @Override
+    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(entity.getPartitionKey());
+        streamEventSerializer.serialize(entity.getEvent(),dataOutput);
+        streamPartitionSerializer.serialize(entity.getPartition(),dataOutput);
+    }
+
+    @Override
+    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
+        PartitionedEvent event = new PartitionedEvent();
+        event.setPartitionKey(dataInput.readLong());
+        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
+        event.setEvent(streamEvent);
+        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
+        partition.setStreamId(streamEvent.getStreamId());
+        event.setPartition(partition);
+        return event;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
new file mode 100644
index 0000000..c518e40
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+import java.io.IOException;
+
+public interface PartitionedEventSerializer {
+    /**
+     *
+     * @param entity
+     * @return
+     * @throws IOException
+     */
+    byte[] serialize(PartitionedEvent entity) throws IOException;
+
+    /**
+     *
+     * @param bytes
+     * @return
+     * @throws IOException
+     */
+    PartitionedEvent deserialize(byte[] bytes) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
new file mode 100644
index 0000000..71b274d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * Integration interface to provide stream definition for serializer
+ */
+public interface SerializationMetadataProvider {
+    /**
+     * @param streamId
+     * @return StreamDefinition or null if not exist
+     */
+    StreamDefinition getStreamDefinition(String streamId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
new file mode 100644
index 0000000..c2f87d0
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface Serializer<V> {
+    void serialize(V value,DataOutput dataOutput) throws IOException;
+    V deserialize(DataInput dataInput) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
new file mode 100644
index 0000000..a94604c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.serialization.impl.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class Serializers {
+    private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>();
+
+    public static <T> void register(StreamColumn.Type type,Serializer<T> serializer){
+        if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
+            throw new IllegalArgumentException("Duplicated column type: "+type);
+        }
+        COLUMN_TYPE_SER_MAPPING.put(type,serializer);
+    }
+
+    public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){
+        if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){
+            return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type);
+        }else{
+            throw new IllegalArgumentException("Serializer of type: "+type+" not found");
+        }
+    }
+
+    public static PartitionedEventSerializer newPartitionedEventSerializer(SerializationMetadataProvider metadataProvider){
+        return new PartitionedEventSerializerImpl(metadataProvider);
+    }
+
+    static {
+        register(StreamColumn.Type.STRING,new StringSerializer());
+        register(StreamColumn.Type.INT,new IntegerSerializer());
+        register(StreamColumn.Type.LONG,new LongSerializer());
+        register(StreamColumn.Type.FLOAT,new FloatSerializer());
+        register(StreamColumn.Type.DOUBLE,new DoubleSerializer());
+        register(StreamColumn.Type.BOOL,new BooleanSerializer());
+        register(StreamColumn.Type.OBJECT,new JavaObjectSerializer());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
new file mode 100644
index 0000000..1e90569
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class BooleanSerializer implements Serializer<Boolean> {
+    @Override
+    public void serialize(Boolean value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeBoolean(value);
+    }
+
+    @Override
+    public Boolean deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readBoolean();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
new file mode 100644
index 0000000..ad5f53c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class DoubleSerializer implements Serializer<Double> {
+    @Override
+    public void serialize(Double value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeDouble(value);
+    }
+
+    @Override
+    public Double deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readDouble();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
new file mode 100644
index 0000000..18089a9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class FloatSerializer implements Serializer<Float> {
+    @Override
+    public void serialize(Float value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeFloat(value);
+    }
+
+    @Override
+    public Float deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readFloat();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
new file mode 100644
index 0000000..d2473a9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class IntegerSerializer implements Serializer<Integer> {
+    @Override
+    public void serialize(Integer value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeInt(value);
+    }
+
+    @Override
+    public Integer deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readInt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
new file mode 100644
index 0000000..76d2294
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class JavaObjectSerializer implements Serializer<Object> {
+    @Override
+    public void serialize(Object value, DataOutput dataOutput) throws IOException {
+        byte[] bytes = SerializationUtils.serialize((Serializable) value);
+        dataOutput.writeInt(bytes.length);
+        dataOutput.write(bytes);
+    }
+
+    @Override
+    public Object deserialize(DataInput dataInput) throws IOException {
+        int len = dataInput.readInt();
+        byte[] bytes = new byte[len];
+        dataInput.readFully(bytes);
+        return SerializationUtils.deserialize(bytes);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
new file mode 100644
index 0000000..8d85c76
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class LongSerializer implements Serializer<Long> {
+    @Override
+    public void serialize(Long value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(value);
+    }
+
+    @Override
+    public Long deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readLong();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
new file mode 100644
index 0000000..714920e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+import org.apache.eagle.alert.engine.utils.CompressionUtils;
+
+import java.io.*;
+
+/**
+ * Stream Metadata Cached Serializer
+ *
+ * Performance:
+ *
+ * 1) VS Kryo Direct: reduce 73.4% space (bytes) and 42.5 % time (ms).
+ * 2) VS Java Native: reduce 92.5% space (bytes) and 94.2% time (ms)
+ *
+ * Tips:
+ *
+ * 1) Without-compression performs better than with compression for small event
+ *
+ * TODO: Cache Partition would save little space but almost half of serialization time, how to balance the performance?
+ *
+ * @see PartitionedEvent
+ */
+public class PartitionedEventSerializerImpl implements Serializer<PartitionedEvent>, PartitionedEventSerializer {
+    private final StreamEventSerializer streamEventSerializer;
+    private final Serializer<StreamPartition> streamPartitionSerializer;
+    private final boolean compress;
+
+    /**
+     * @param serializationMetadataProvider metadata provider
+     * @param compress false by default
+     */
+    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider,boolean compress) {
+        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
+        this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
+        this.compress = compress;
+    }
+
+    public PartitionedEventSerializerImpl(SerializationMetadataProvider serializationMetadataProvider) {
+        this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider);
+        this.streamPartitionSerializer = StreamPartitionSerializer.INSTANCE;
+        this.compress = false;
+    }
+
+    @Override
+    public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException {
+        dataOutput.writeLong(entity.getPartitionKey());
+        streamEventSerializer.serialize(entity.getEvent(), dataOutput);
+        streamPartitionSerializer.serialize(entity.getPartition(), dataOutput);
+    }
+
+    @Override
+    public PartitionedEvent deserialize(DataInput dataInput) throws IOException {
+        PartitionedEvent event = new PartitionedEvent();
+        event.setPartitionKey(dataInput.readLong());
+        StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput);
+        StreamPartition partition = streamPartitionSerializer.deserialize(dataInput);
+        event.setEvent(streamEvent);
+        partition.setStreamId(streamEvent.getStreamId());
+        event.setPartition(partition);
+        return event;
+    }
+
+    @Override
+    public byte[] serialize(PartitionedEvent entity) throws IOException {
+        ByteArrayDataOutput dataOutput = ByteStreams.newDataOutput();
+        this.serialize(entity,dataOutput);
+        return compress ? CompressionUtils.compress(dataOutput.toByteArray()):dataOutput.toByteArray();
+    }
+
+    @Override
+    public PartitionedEvent deserialize(byte[] bytes) throws IOException {
+        return this.deserialize(ByteStreams.newDataInput(compress ? CompressionUtils.decompress(bytes):bytes));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
new file mode 100644
index 0000000..f6264e4
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+import org.apache.eagle.alert.engine.serialization.Serializers;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.BitSet;
+
+/**
+ * @see StreamEvent
+ */
+public class StreamEventSerializer implements Serializer<StreamEvent> {
+    private final SerializationMetadataProvider serializationMetadataProvider;
+
+    public StreamEventSerializer(SerializationMetadataProvider serializationMetadataProvider){
+        this.serializationMetadataProvider = serializationMetadataProvider;
+    }
+
+    /**
+     *
+     * @param objects
+     * @return
+     */
+    private BitSet isNullBitSet(Object[] objects){
+        BitSet bitSet = new BitSet();
+        int i = 0;
+        for(Object obj:objects){
+            bitSet.set(i,obj == null);
+            i++;
+        }
+        return bitSet;
+    }
+
+    @Override
+    public void serialize(StreamEvent event, DataOutput dataOutput) throws IOException {
+        dataOutput.writeUTF(event.getStreamId());
+        dataOutput.writeLong(event.getTimestamp());
+        if(event.getData() == null || event.getData().length == 0){
+            dataOutput.writeInt(0);
+        }else{
+            BitSet isNullIndex = isNullBitSet(event.getData());
+            byte[] isNullBytes = isNullIndex.toByteArray();
+            dataOutput.writeInt(isNullBytes.length);
+            dataOutput.write(isNullBytes);
+            int i =0;
+            StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
+            if(definition == null) throw new IOException("StreamDefinition not found: "+event.getStreamId());
+            if(event.getData().length != definition.getColumns().size()){
+                throw new IOException("Event :"+event+" doesn't match with schema: "+definition);
+            }
+            for(StreamColumn column:definition.getColumns()){
+                if(!isNullIndex.get(i)) {
+                    Serializers.getColumnSerializer(column.getType()).serialize(event.getData()[i],dataOutput);
+                }
+                i ++;
+            }
+        }
+    }
+
+    @Override
+    public StreamEvent deserialize(DataInput dataInput) throws IOException {
+        StreamEvent event = new StreamEvent();
+        event.setStreamId(dataInput.readUTF());
+        StreamDefinition definition = serializationMetadataProvider.getStreamDefinition(event.getStreamId());
+        event.setTimestamp(dataInput.readLong());
+        int isNullBytesLen = dataInput.readInt();
+        byte[] isNullBytes = new byte[isNullBytesLen];
+        dataInput.readFully(isNullBytes);
+        BitSet isNullIndex = BitSet.valueOf(isNullBytes);
+        Object[] attributes = new Object[definition.getColumns().size()];
+        int i = 0;
+        for(StreamColumn column:definition.getColumns()){
+            if(!isNullIndex.get(i)) {
+                attributes[i] = Serializers.getColumnSerializer(column.getType()).deserialize(dataInput);
+            }
+            i ++;
+        }
+        event.setData(attributes);
+        return event;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
new file mode 100644
index 0000000..67bf899
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+
+/**
+ * Don't serialize streamId
+ *
+ * @see StreamPartition
+ */
+public class StreamPartitionDigestSerializer implements Serializer<StreamPartition> {
+    public final static StreamPartitionDigestSerializer INSTANCE = new StreamPartitionDigestSerializer();
+
+    private final Map<DigestBytes,StreamPartition> checkSumPartitionMap = new HashMap<>();
+    private final Map<StreamPartition,DigestBytes> partitionCheckSumMap = new HashMap<>();
+
+    @Override
+    public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
+        DigestBytes checkSum = partitionCheckSumMap.get(partition);
+        if(checkSum == null){
+            try {
+                checkSum = digestCheckSum(partition);
+                partitionCheckSumMap.put(partition,checkSum);
+                checkSumPartitionMap.put(checkSum,partition);
+            } catch (NoSuchAlgorithmException e) {
+                throw new IOException(e);
+            }
+        }
+        dataOutput.writeInt(checkSum.size());
+        dataOutput.write(checkSum.toByteArray());
+    }
+
+    @Override
+    public StreamPartition deserialize(DataInput dataInput) throws IOException {
+        int checkSumLen = dataInput.readInt();
+        byte[] checksum = new byte[checkSumLen];
+        dataInput.readFully(checksum);
+        StreamPartition partition = checkSumPartitionMap.get(new DigestBytes(checksum));
+        if(partition == null){
+            throw new IOException("Illegal partition checksum: "+checksum);
+        }
+        return partition;
+    }
+
+    private class DigestBytes {
+        private final byte[] data;
+
+        public DigestBytes(byte[] bytes){
+            this.data = bytes;
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return other instanceof DigestBytes && Arrays.equals(data, ((DigestBytes) other).data);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(data);
+        }
+        public int size(){
+            return data.length;
+        }
+        public byte[] toByteArray(){
+            return data;
+        }
+    }
+
+    private DigestBytes digestCheckSum(Object obj) throws IOException, NoSuchAlgorithmException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(obj);
+        oos.close();
+        MessageDigest m = MessageDigest.getInstance("SHA1");
+        m.update(baos.toByteArray());
+        return new DigestBytes(m.digest());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
new file mode 100644
index 0000000..e0bb171
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Don't serialize streamId
+ *
+ * @see StreamPartition
+ */
+public class StreamPartitionSerializer implements Serializer<StreamPartition> {
+    public final static StreamPartitionSerializer INSTANCE = new StreamPartitionSerializer();
+
+    @Override
+    public void serialize(StreamPartition partition, DataOutput dataOutput) throws IOException {
+        dataOutput.writeUTF(partition.getType().toString());
+        if(partition.getColumns() == null || partition.getColumns().size() == 0){
+            dataOutput.writeInt(0);
+        } else {
+            dataOutput.writeInt(partition.getColumns().size());
+            for(String column:partition.getColumns()){
+                dataOutput.writeUTF(column);
+            }
+        }
+        if(partition.getSortSpec() == null){
+            dataOutput.writeByte(0);
+        }else {
+            dataOutput.writeByte(1);
+            dataOutput.writeUTF(partition.getSortSpec().getWindowPeriod());
+            dataOutput.writeInt(partition.getSortSpec().getWindowMargin());
+        }
+    }
+
+    @Override
+    public StreamPartition deserialize(DataInput dataInput) throws IOException {
+        StreamPartition partition = new StreamPartition();
+        partition.setType(StreamPartition.Type.locate(dataInput.readUTF()));
+        int colSize = dataInput.readInt();
+        if(colSize>0){
+            List<String> columns = new ArrayList<>(colSize);
+            for(int i=0;i<colSize;i++){
+                columns.add(dataInput.readUTF());
+            }
+            partition.setColumns(columns);
+        }
+        if(dataInput.readByte() == 1){
+            String period = dataInput.readUTF();
+            int margin = dataInput.readInt();
+
+            StreamSortSpec sortSpec = new StreamSortSpec();
+            sortSpec.setWindowPeriod(period);
+            sortSpec.setWindowMargin(margin);
+            partition.setSortSpec(sortSpec);
+        }
+        return partition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
new file mode 100644
index 0000000..2a1541a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java
@@ -0,0 +1,35 @@
+package org.apache.eagle.alert.engine.serialization.impl;
+
+import org.apache.eagle.alert.engine.serialization.Serializer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StringSerializer implements Serializer<String> {
+    @Override
+    public void serialize(String value, DataOutput dataOutput) throws IOException {
+        dataOutput.writeUTF(value);
+    }
+
+    @Override
+    public String deserialize(DataInput dataInput) throws IOException {
+        return dataInput.readUTF();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
new file mode 100644
index 0000000..b705564
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregator.java
@@ -0,0 +1,122 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.alert.engine.siddhi.extension;
+
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.definition.Attribute.Type;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * @since Apr 1, 2016
+ *
+ */
+public class AttributeCollectAggregator extends AttributeAggregator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AttributeCollectAggregator.class);
+
+    private LinkedList<Object> value;
+
+    public AttributeCollectAggregator() {
+        value = new LinkedList<Object>();
+    }
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public Object[] currentState() {
+        return value.toArray();
+    }
+
+    @Override
+    public void restoreState(Object[] arg0) {
+        value = new LinkedList<Object>();
+        if (arg0 != null) {
+            for (Object o : arg0) {
+                value.add(o);
+            }
+        }
+    }
+
+    @Override
+    public Type getReturnType() {
+        return Attribute.Type.OBJECT;
+    }
+
+    @Override
+    protected void init(ExpressionExecutor[] arg0, ExecutionPlanContext arg1) {
+        // TODO: Support max of elements?
+    }
+
+    @Override
+    public Object processAdd(Object arg0) {
+        value.add(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    @Override
+    public Object processAdd(Object[] arg0) {
+        value.add(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processAdd: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object arg0) {
+        value.remove(arg0);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("processRemove: current values are : " + value);
+        }
+        return ImmutableList.copyOf(value);
+    }
+
+    // / NOTICE: non O(1)
+    @Override
+    public Object processRemove(Object[] arg0) {
+        value.remove(arg0);
+        LOG.info("processRemove: current values are : " + value);
+        return ImmutableList.copyOf(value);
+    }
+
+    @Override
+    public Object reset() {
+        value.clear();
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
new file mode 100644
index 0000000..38c5c30
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/BaseStreamWindow.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: Make sure thread-safe
+ * TODO: Leverage Off-Heap Memory to persist append-only events collection
+ */
+public abstract class BaseStreamWindow implements StreamWindow {
+    private final long endTime;
+    private final long startTime;
+    private final long margin;
+    private final AtomicBoolean expired;
+    private final long createdTime;
+    private final static Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
+    private PartitionedEventCollector collector;
+    private final AtomicLong lastFlushedStreamTime;
+    private final AtomicLong lastFlushedSystemTime;
+
+    /**
+     * @param startTime
+     * @param endTime
+     * @param marginTime
+     */
+    public BaseStreamWindow(long startTime, long endTime, long marginTime){
+        if(startTime >= endTime){
+            throw new IllegalArgumentException("startTime: "+startTime+" >= endTime: "+endTime+", expected: startTime < endTime");
+        }
+        if(marginTime > endTime - startTime){
+            throw new IllegalArgumentException("marginTime: "+marginTime+" > endTime: "+endTime+" - startTime "+startTime+", expected: marginTime < endTime - startTime");
+        }
+        this.startTime = startTime;
+        this.endTime = endTime;
+        this.margin = marginTime;
+        this.expired = new AtomicBoolean(false);
+        this.createdTime = System.currentTimeMillis();
+        this.lastFlushedStreamTime = new AtomicLong(0);
+        this.lastFlushedSystemTime = new AtomicLong(this.createdTime);
+    }
+
+    @Override
+    public void register(PartitionedEventCollector collector) {
+        if(this.collector!=null)
+            throw new IllegalArgumentException("Duplicated collector error");
+        this.collector = collector;
+    }
+
+    @Override
+    public long createdTime() {
+        return createdTime;
+    }
+
+    public long startTime() {
+        return this.startTime;
+    }
+
+    @Override
+    public long rejectTime(){
+        return this.lastFlushedStreamTime.get();
+    }
+
+    @Override
+    public long margin() {
+        return this.margin;
+    }
+
+    public long endTime() {
+        return this.endTime;
+    }
+
+    public boolean accept(final long eventTime) {
+        return !expired() && eventTime >= startTime && eventTime < endTime
+                && eventTime >= lastFlushedStreamTime.get(); // dropped
+    }
+
+    public boolean expired() {
+        return expired.get();
+    }
+
+    @Override
+    public boolean alive() {
+        return !expired.get();
+    }
+
+    /**
+     *
+     *
+     * Expire when
+     * 1) If stream time >= endTime + marginTime, then flush and expire
+     * 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
+     * 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
+     * 4) else do nothing
+     *
+     *  @param clock stream time clock
+     *  @param globalSystemTime system time clock
+     */
+    @Override
+    public synchronized void onTick(StreamTimeClock clock,long globalSystemTime) {
+        if(!expired()) {
+            if(clock.getTime() >= endTime + margin){
+                LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+                lastFlushedStreamTime.set(clock.getTime());
+                lastFlushedSystemTime.set(globalSystemTime);
+                flush();
+                expired.set(true);
+            } else if(globalSystemTime - lastFlushedSystemTime.get() >=  endTime + margin - startTime && size() > 0){
+                LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}",clock.getStreamId(),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()),globalSystemTime - lastFlushedSystemTime.get(),this);
+                lastFlushedStreamTime.set(clock.getTime());
+                lastFlushedSystemTime.set(globalSystemTime);
+                flush();
+                if(lastFlushedStreamTime.get()>=this.endTime) expired.set(true);
+            }
+        } else {
+            LOG.warn("Window has already expired, should not tick: {}",this.toString());
+        }
+    }
+
+    public void close() {
+        flush();
+        expired.set(true);
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(startTime).append(endTime).append(margin).build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if(obj !=null && obj instanceof BaseStreamWindow) {
+            BaseStreamWindow another = (BaseStreamWindow) obj;
+            return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
+        }
+        return false;
+    }
+
+    @Override
+    public void flush(){
+        if(this.collector == null) throw new NullPointerException("Collector is not given before window flush");
+        this.flush(collector);
+    }
+
+    /**
+     *
+     * @param collector
+     * @return max timestamp
+     */
+    protected abstract void flush(PartitionedEventCollector collector);
+
+    @Override
+    public String toString() {
+        return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
+                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
+                DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
+                this.margin,
+                size(),
+                this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime): DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
+                );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
new file mode 100644
index 0000000..0881e35
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClock.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+/**
+ * The time clock per stream
+ *
+ * Should be thread-safe between getTime and moveForward
+ *
+ * By default, we currently simple support event timestamp now
+ */
+public interface StreamTimeClock {
+    /**
+     * Get stream id
+     *
+     * @return stream id
+     */
+    String getStreamId();
+
+    /**
+     * Get current time
+     *
+     * @return current timestamp value
+     */
+    long getTime();
+
+    /**
+     * @param timestamp move forward current time to given timestamp
+     */
+    void moveForward(long timestamp);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
new file mode 100644
index 0000000..f7e463d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockListener.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+public interface StreamTimeClockListener {
+    /**
+     * @see StreamWindow
+     *
+     * @param streamTime
+     * @param globalSystemTime
+     */
+    void onTick(StreamTimeClock streamTime, long globalSystemTime);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
new file mode 100644
index 0000000..29b13eb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockManager.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import java.io.Serializable;
+
+/**
+ *
+ * By default, we could keep the current time clock in memory,
+ * Eventually we may need to consider the global time synchronization across all nodes
+ *
+ * TODO: maybe need to synchronize time clock globally
+ *
+ * 1) When to initialize window according to start time
+ * 2) When to close expired window according to current time
+ * 3) Automatically tick periodically as the single place for control lock
+ *
+ */
+public interface StreamTimeClockManager extends StreamTimeClockTrigger, Serializable{
+    /**
+     * @return StreamTimeClock instance
+     */
+    StreamTimeClock createStreamTimeClock(String streamId);
+
+    StreamTimeClock getStreamTimeClock(String streamId);
+
+    /**
+     * @param streamId
+     */
+    void removeStreamTimeClock(String streamId);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
new file mode 100644
index 0000000..a0cd184
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamTimeClockTrigger.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+
+/**
+ * Possible implementation:
+ *
+ * 1) EventTimeClockTrigger (by default)
+ * 2) SystemTimeClockTrigger
+ */
+public interface StreamTimeClockTrigger {
+    /**
+     * @param streamId stream id to listen to
+     * @param listener to watch on streamId
+     */
+    void registerListener(String streamId, StreamTimeClockListener listener);
+
+    /**
+     *
+     * @param streamClock
+     * @param listener
+     */
+    void registerListener(StreamTimeClock streamClock, StreamTimeClockListener listener);
+
+    /**
+     * @param listener listener to remove
+     */
+    void removeListener(StreamTimeClockListener listener);
+
+    /**
+     * Trigger tick of all listeners on certain stream
+     *
+     * @param streamId stream id
+     */
+    void triggerTickOn(String streamId);
+
+    /**
+     * Update time per new event time on stream
+     *
+     * @param streamId
+     * @param timestamp
+     */
+    void onTimeUpdate(String streamId, long timestamp);
+
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
new file mode 100644
index 0000000..2500122
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/sorter/StreamWindow.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.sorter;
+
+import org.apache.eagle.alert.engine.PartitionedEventCollector;
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * <h2>Tumbling Window instead Sliding Window</h2>
+ * We could have time overlap to sort out-of-ordered stream,
+ * but each window should never have events overlap, otherwise will have logic problem.
+ *
+ *
+ * <h2>Ingestion Time Policy</h2>
+ * Different notions of time, namely processing time, event time, and ingestion time.
+ *
+ * <ol>
+ * <li>
+ *  In processing time, windows are defined with respect to the wall clock of the machine that builds and processes a window, i.e., a one minute processing time window collects elements for exactly one minute.
+ * </li>
+ * <li>
+ * In event time, windows are defined with respect to timestamps that are attached to each event record. This is common for many types of events, such as log entries, sensor data, etc, where the timestamp usually represents the time at which the event occurred. Event time has several benefits over processing time. First of all, it decouples the program semantics from the actual serving speed of the source and the processing performance of system. Hence you can process historic data, which is served at maximum speed, and continuously produced data with the same program. It also prevents semantically incorrect results in case of backpressure or delays due to failure recovery. Second, event time windows compute correct results, even if events arrive out-of-order of their timestamp which is common if a data stream gathers events from distributed sources.
+ * </li>
+ * <li>
+ * Ingestion time is a hybrid of processing and event time. It assigns wall clock timestamps to records as soon as they arrive in the system (at the source) and continues processing with event time semantics based on the attached timestamps.
+ * </li>
+ * </ol>
+ */
+public interface StreamWindow extends StreamTimeClockListener {
+    /**
+     * @return Created timestamp
+     */
+    long createdTime();
+
+    /**
+     * Get start time
+     *
+     * @return
+     */
+    long startTime();
+
+    long margin();
+
+    /**
+     * @return reject timestamp < rejectTime()
+     */
+    long rejectTime();
+
+    /**
+     * Get end time
+     *
+     * @return
+     */
+    long endTime();
+
+    /**
+     * @param timestamp event time
+     * @return true/false in boolean
+     */
+    boolean accept(long timestamp);
+
+    /**
+     * Window is expired
+     *
+     * @return whether window is expired
+     */
+    boolean expired();
+
+    /**
+     * @return whether window is alive
+     */
+    boolean alive();
+
+    /**
+     *
+     * @param event
+     */
+    boolean add(PartitionedEvent event);
+
+//    /**
+//     * @param collector Drain to output collector
+//     */
+//    void flush(PartitionedEventCollector collector);
+
+    void flush();
+
+    /**
+     * Close window
+     */
+    void close();
+
+    void register(PartitionedEventCollector collector);
+
+    /**
+     *
+     * @return
+     */
+    int size();
+}
\ No newline at end of file


Mime
View raw message