eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [17/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
Date Wed, 01 Jun 2016 06:00:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java
new file mode 100644
index 0000000..773ae56
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRouterSpec.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+/**
+ * One RouteSpec means one rule mapping [streamId -> StreamPartition ->
+ * PolicyExecutionQueue]
+ *
+ * Key is StreamPartition
+ */
+public class StreamRouterSpec {
+    private String streamId;
+    private StreamPartition partition; // The meta-data to build
+                                       // StreamPartitioner
+    private List<PolicyWorkerQueue> targetQueue = new ArrayList<PolicyWorkerQueue>();
+
+    public StreamPartition getPartition() {
+        return partition;
+    }
+
+    public void setPartition(StreamPartition partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(this.streamId).append(this.partition).append(targetQueue).build();
+    }
+
+    public List<PolicyWorkerQueue> getTargetQueue() {
+        return targetQueue;
+    }
+
+    public void addQueue(PolicyWorkerQueue queue) {
+        this.targetQueue.add(queue);
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public void setTargetQueue(List<PolicyWorkerQueue> targetQueue) {
+        this.targetQueue = targetQueue;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("StreamRouterSpec[streamId=%s,partition=%s, queue=[%s]]", this.getStreamId(),
+                this.getPartition(), this.getTargetQueue());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
new file mode 100644
index 0000000..4f16756
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
@@ -0,0 +1,65 @@
+package org.apache.eagle.alert.coordination.model;
+
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert incoming tuple to stream
+ * incoming tuple consists of 2 fields, topic and map of key/value
+ * output stream consists of 3 fields, stream name, timestamp, and map of key/value
+ */
+public class Tuple2StreamConverter {
+    private static final Logger LOG = LoggerFactory.getLogger(Tuple2StreamConverter.class);
+    private Tuple2StreamMetadata metadata;
+    private StreamNameSelector cachedSelector;
+    public Tuple2StreamConverter(Tuple2StreamMetadata metadata){
+        this.metadata = metadata;
+        try {
+            cachedSelector = (StreamNameSelector)Class.forName(metadata.getStreamNameSelectorCls()).
+                    getConstructor(Properties.class).
+                    newInstance(metadata.getStreamNameSelectorProp());
+        }catch(Exception ex){
+            LOG.error("error initializing StreamNameSelector object", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    /**
+     * Assume tuple is composed of topic + map of key/value
+     * @param tuple
+     * @return
+     */
+    @SuppressWarnings({ "unchecked" })
+    public List<Object> convert(List<Object> tuple){
+        Map<String, Object> m = (Map<String, Object>)tuple.get(1);
+        String streamName = cachedSelector.getStreamName(m);
+        if(!metadata.getActiveStreamNames().contains(streamName)) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("streamName {} is not within activeStreamNames {}", streamName, metadata.getActiveStreamNames());
+            }
+            return null;
+        }
+
+        Object timeObject = m.get(metadata.getTimestampColumn());
+        long timestamp = 0L;
+        if(timeObject instanceof Number){
+            timestamp = ((Number) timeObject).longValue();
+        }else{
+            String timestampFieldValue = (String) m.get(metadata.getTimestampColumn());
+            try {
+                SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat());
+                timestamp = sdf.parse(timestampFieldValue).getTime();
+            } catch (Exception ex) {
+                LOG.error("continue with current timestamp because error happens while parsing timestamp column " + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat());
+                timestamp = System.currentTimeMillis();
+            }
+        }
+        return Arrays.asList(tuple.get(0), streamName, timestamp, tuple.get(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
new file mode 100644
index 0000000..c37e785
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java
@@ -0,0 +1,71 @@
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * @Since 4/25/16. This metadata controls how tuple is transformed to stream for
+ *        example raw data consists of {"metric" : "cpuUsage", "host" :
+ *        "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric"
+ *        is used for creating stream name, here "cpuUsage" is stream name
+ *
+ *        metric could be "cpuUsage", "diskUsage", "memUsage" etc, so
+ *        activeStreamNames are subset of all metric names
+ *
+ *        All other messages which are not one of activeStreamNames will be
+ *        filtered out
+ */
+public class Tuple2StreamMetadata {
+    /**
+     * only messages belonging to activeStreamNames will be kept while
+     * transforming tuple into stream
+     */
+    private Set<String> activeStreamNames = new HashSet<String>();
+    // the specific stream name selector
+    private Properties streamNameSelectorProp;
+    private String streamNameSelectorCls;
+    private String timestampColumn;
+    private String timestampFormat;
+
+    public Set<String> getActiveStreamNames() {
+        return activeStreamNames;
+    }
+
+    public void setActiveStreamNames(Set<String> activeStreamNames) {
+        this.activeStreamNames = activeStreamNames;
+    }
+
+    public Properties getStreamNameSelectorProp() {
+        return streamNameSelectorProp;
+    }
+
+    public void setStreamNameSelectorProp(Properties streamNameSelectorProp) {
+        this.streamNameSelectorProp = streamNameSelectorProp;
+    }
+
+    public String getStreamNameSelectorCls() {
+        return streamNameSelectorCls;
+    }
+
+    public void setStreamNameSelectorCls(String streamNameSelectorCls) {
+        this.streamNameSelectorCls = streamNameSelectorCls;
+    }
+
+    public String getTimestampColumn() {
+        return timestampColumn;
+    }
+
+    public void setTimestampColumn(String timestampColumn) {
+        this.timestampColumn = timestampColumn;
+    }
+
+    public String getTimestampFormat() {
+        return timestampFormat;
+    }
+
+    public void setTimestampFormat(String timestampFormat) {
+        this.timestampFormat = timestampFormat;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
new file mode 100644
index 0000000..f4b8ccb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+/**
+ * @since May 25, 2016
+ *
+ */
+public class VersionedPolicyDefinition {
+    private String version;
+    private PolicyDefinition definition;
+
+    public VersionedPolicyDefinition() {
+    }
+
+    public VersionedPolicyDefinition(String version, PolicyDefinition def) {
+        this.version = version;
+        this.definition = def;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public PolicyDefinition getDefinition() {
+        return definition;
+    }
+
+    public void setDefinition(PolicyDefinition definition) {
+        this.definition = definition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
new file mode 100644
index 0000000..2770aa1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * @since May 25, 2016
+ *
+ */
+public class VersionedStreamDefinition {
+    private String version;
+    private StreamDefinition definition;
+
+    public VersionedStreamDefinition() {
+    }
+
+    public VersionedStreamDefinition(String version, StreamDefinition def) {
+        this.version = version;
+        this.definition = def;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public StreamDefinition getDefinition() {
+        return definition;
+    }
+
+    public void setDefinition(StreamDefinition definition) {
+        this.definition = definition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
new file mode 100644
index 0000000..96016f4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+
+/**
+ * A slot is simply a bolt.
+ */
+public class WorkSlot {
+    public String topologyName;
+    public String boltId;
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
+
+    public String getBoltId() {
+        return boltId;
+    }
+
+    public void setBoltId(String boltId) {
+        this.boltId = boltId;
+    }
+
+    public WorkSlot() {
+
+    }
+
+    public WorkSlot(String topo, String boltId) {
+        this.topologyName = topo;
+        this.boltId = boltId;
+    }
+
+    public String toString() {
+        return "(" + topologyName + ":" + boltId + ")";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
new file mode 100644
index 0000000..beda896
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * A monitored stream is the unique data set in the system.
+ * 
+ * It's a combination of stream and the specific grp-by on it.
+ * 
+ * For correlation stream, it means multiple stream for a given monitored stream.
+ * 
+ * 
+ * @since Apr 27, 2016
+ *
+ */
+public class MonitoredStream {
+
+    private String version;
+
+    // the stream group that this monitored stream stands for
+    private StreamGroup streamGroup = new StreamGroup();
+    private List<StreamWorkSlotQueue> queues = new ArrayList<StreamWorkSlotQueue>();
+    
+    public MonitoredStream() {
+    }
+
+    public MonitoredStream(StreamGroup par) {
+        this.streamGroup = par;
+    }
+
+    public StreamGroup getStreamGroup() {
+        return streamGroup;
+    }
+
+    public List<StreamWorkSlotQueue> getQueues() {
+        return queues;
+    }
+
+    public synchronized void addQueues(StreamWorkSlotQueue queue) {
+        queues.add(queue);
+    }
+
+    public synchronized boolean removeQueue(StreamWorkSlotQueue queue) {
+        return this.queues.remove(queue);
+    }
+
+    public int hashCode() {
+        return new HashCodeBuilder().append(streamGroup).build();
+    }
+
+    public boolean equals(Object other) {
+        if (!(other instanceof MonitoredStream)) {
+            return false;
+        }
+        MonitoredStream o = (MonitoredStream) other;
+        return Objects.equals(streamGroup, o.streamGroup);
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public void setQueues(List<StreamWorkSlotQueue> queues) {
+        this.queues = queues;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
new file mode 100644
index 0000000..3e956ca
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+/**
+ * monitor metadata
+ * 
+ * @since Apr 27, 2016
+ *
+ */
+public class PolicyAssignment {
+
+    private String version;
+
+    private String policyName;
+    private String queueId;
+
+    public PolicyAssignment() {
+    }
+
+    public PolicyAssignment(String policyName, String queueId) {
+        this.policyName = policyName;
+        this.queueId = queueId;
+    }
+
+    public String getPolicyName() {
+        return policyName;
+    }
+
+    public String getQueueId() {
+        return queueId;
+    }
+
+    public void setPolicyName(String policyName) {
+        this.policyName = policyName;
+    }
+
+    public void setQueueId(String queueId) {
+        this.queueId = queueId;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("PolicyAssignment of policy %s, queueId %s, version %s !", policyName, queueId, version);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
new file mode 100644
index 0000000..9cbd841
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Objects;
+
+/**
+ * @since May 6, 2016
+ *
+ */
+public class StreamGroup {
+
+    private List<StreamPartition> streamPartitions = new ArrayList<StreamPartition>();
+    
+    public StreamGroup() {
+    }
+    
+    public List<StreamPartition> getStreamPartitions() {
+        return streamPartitions;
+    }
+
+    public void addStreamPartition(StreamPartition sp) {
+        this.streamPartitions.add(sp);
+    }
+
+    public void addStreamPartitions(List<StreamPartition> sps) {
+        this.streamPartitions.addAll(sps);
+    }
+
+    @org.codehaus.jackson.annotate.JsonIgnore
+    @JsonIgnore
+    public String getStreamId() {
+        StringBuilder sb = new StringBuilder("SG[");
+        for (StreamPartition sp : streamPartitions) {
+            sb.append(sp.getStreamId()).append("-");
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        // implicitly all groups in stream groups will be built for hash code
+        return new HashCodeBuilder().append(streamPartitions).build();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof StreamGroup)) {
+            return false;
+        }
+        StreamGroup o = (StreamGroup) obj;
+        return Objects.equal(this.streamPartitions, o.streamPartitions);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("StreamGroup partitions=: %s ", streamPartitions);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
new file mode 100644
index 0000000..fab6217
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * A work queue for given one monitored stream.
+ * 
+ * Analog to storm's "tasks for given bolt".
+ * 
+ * @since Apr 27, 2016
+ *
+ */
+public class StreamWorkSlotQueue {
+    private String queueId;
+
+    private final List<WorkSlot> workingSlots = new LinkedList<WorkSlot>();
+    private boolean dedicated;
+    // some dedicated option, like dedicated userId/tenantId/policyId.
+    private Map<String, Object> dedicateOption;
+    
+    private int numberOfGroupBolts;
+    private Map<String, Integer> topoGroupStartIndex = new HashMap<String, Integer>(); 
+
+    public StreamWorkSlotQueue() {
+    }
+    
+    public StreamWorkSlotQueue(StreamGroup par, boolean isDedicated, Map<String, Object> options,
+            List<WorkSlot> slots) {
+        this.queueId = par.getStreamId() + System.currentTimeMillis();// simply generate a queue
+        this.dedicated = isDedicated;
+        dedicateOption = new HashMap<String, Object>();
+        dedicateOption.putAll(options);
+        this.workingSlots.addAll(slots);
+    }
+
+    public Map<String, Object> getDedicateOption() {
+        return dedicateOption;
+    }
+
+    public void setDedicateOption(Map<String, Object> dedicateOption) {
+        this.dedicateOption = dedicateOption;
+    }
+
+    public List<WorkSlot> getWorkingSlots() {
+        return workingSlots;
+    }
+
+    public boolean isDedicated() {
+        return dedicated;
+    }
+
+    public void setDedicated(boolean dedicated) {
+        this.dedicated = dedicated;
+    }
+
+    @org.codehaus.jackson.annotate.JsonIgnore
+    @JsonIgnore
+    public int getQueueSize() {
+        return workingSlots.size();
+    }
+
+//    @org.codehaus.jackson.annotate.JsonIgnore
+//    @JsonIgnore
+//    public void placePolicy(PolicyDefinition pd) {
+//        policies.add(pd.getName());
+//    }
+
+    public int getNumberOfGroupBolts() {
+        return numberOfGroupBolts;
+    }
+
+    public void setNumberOfGroupBolts(int numberOfGroupBolts) {
+        this.numberOfGroupBolts = numberOfGroupBolts;
+    }
+
+    public Map<String, Integer> getTopoGroupStartIndex() {
+        return topoGroupStartIndex;
+    }
+
+    public void setTopoGroupStartIndex(Map<String, Integer> topoGroupStartIndex) {
+        this.topoGroupStartIndex = topoGroupStartIndex;
+    }
+
+    @org.codehaus.jackson.annotate.JsonIgnore
+    @JsonIgnore
+    public int getTopologyGroupStartIndex(String topo) {
+        if (topoGroupStartIndex.containsKey(topo)) {
+            return this.topoGroupStartIndex.get(topo);
+        }
+        return -1;
+    }
+
+    public String getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(String queueId) {
+        this.queueId = queueId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
new file mode 100644
index 0000000..189e2a5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @since Mar 24, 2016 Logically one unit topology consists of S spouts, G
+ *        groupby bolts, A alertBolts normally S=1 Physically each spout is
+ *        composed of s spout nodes, each groupby bolt is composed of g groupby
+ *        nodes, and each alert bolt is composed of a alert nodes
+ */
+public class Topology {
+
+    private String name;
+    // number of logical nodes
+    private int numOfSpout;
+    private int numOfAlertBolt;
+    private int numOfGroupBolt;
+    private int numOfPublishBolt;
+    private String spoutId;
+    private String pubBoltId;
+    @Deprecated
+    private Set<String> groupNodeIds;
+    @Deprecated
+    private Set<String> alertBoltIds;
+
+    // number of physical nodes for each logic bolt
+    private int spoutParallelism = 1;
+    private int groupParallelism = 1;
+    private int alertParallelism = 1;
+
+    private String clusterName;
+
+    public Topology() {
+    }
+
+    public Topology(String name, int group, int alert) {
+        this.name = name;
+        this.numOfSpout = 1;
+        this.numOfGroupBolt = group;
+        this.numOfAlertBolt = alert;
+        groupNodeIds = new HashSet<String>(group);
+        alertBoltIds = new HashSet<String>(alert);
+
+        spoutParallelism = 1;
+        groupParallelism = 1;
+        alertParallelism = 1;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public int getNumOfSpout() {
+        return numOfSpout;
+    }
+
+    public void setNumOfSpout(int numOfSpout) {
+        this.numOfSpout = numOfSpout;
+    }
+
+    public int getNumOfAlertBolt() {
+        return numOfAlertBolt;
+    }
+
+    public void setNumOfAlertBolt(int numOfAlertBolt) {
+        this.numOfAlertBolt = numOfAlertBolt;
+    }
+
+    public int getNumOfGroupBolt() {
+        return numOfGroupBolt;
+    }
+
+    public void setNumOfGroupBolt(int numOfGroupBolt) {
+        this.numOfGroupBolt = numOfGroupBolt;
+    }
+
+    public String getSpoutId() {
+        return spoutId;
+    }
+
+    public void setSpoutId(String spoutId) {
+        this.spoutId = spoutId;
+    }
+
+    public String getPubBoltId() {
+        return pubBoltId;
+    }
+
+    public void setPubBoltId(String pubBoltId) {
+        this.pubBoltId = pubBoltId;
+    }
+
+    public Set<String> getGroupNodeIds() {
+        return groupNodeIds;
+    }
+
+    public void setGroupNodeIds(Set<String> groupNodeIds) {
+        this.groupNodeIds = groupNodeIds;
+    }
+
+    public Set<String> getAlertBoltIds() {
+        return alertBoltIds;
+    }
+
+    public void setAlertBoltIds(Set<String> alertBoltIds) {
+        this.alertBoltIds = alertBoltIds;
+    }
+
+    public int getNumOfPublishBolt() {
+        return numOfPublishBolt;
+    }
+
+    public void setNumOfPublishBolt(int numOfPublishBolt) {
+        this.numOfPublishBolt = numOfPublishBolt;
+    }
+
+    public int getSpoutParallelism() {
+        return spoutParallelism;
+    }
+
+    public void setSpoutParallelism(int spoutParallelism) {
+        this.spoutParallelism = spoutParallelism;
+    }
+
+    public int getGroupParallelism() {
+        return groupParallelism;
+    }
+
+    public void setGroupParallelism(int groupParallelism) {
+        this.groupParallelism = groupParallelism;
+    }
+
+    public int getAlertParallelism() {
+        return alertParallelism;
+    }
+
+    public void setAlertParallelism(int alertParallelism) {
+        this.alertParallelism = alertParallelism;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
new file mode 100644
index 0000000..3272d28
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.codec;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+/**
+ * @since Apr 5, 2016
+ *
+ */
+public interface IStreamCodec {
+
+    StreamEvent decode(byte[] contents);
+
+    byte[] encode(StreamEvent tuple);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
new file mode 100644
index 0000000..49726e9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.codec;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+/**
+ * This is used for event codec.
+ * 
+ * @since Apr 5, 2016
+ *
+ */
+public class SherlockEventCodec implements IStreamCodec {
+
+    @Override
+    public StreamEvent decode(byte[] contents) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public byte[] encode(StreamEvent tuple) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
new file mode 100644
index 0000000..0e875d8
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.codec;
+
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+/**
+ * @since Apr 5, 2016
+ *
+ */
+public class SherlockMetricCodec implements IStreamCodec {
+
+    @Override
+    public StreamEvent decode(byte[] contents) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public byte[] encode(StreamEvent tuple) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
new file mode 100644
index 0000000..f97eb2b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * @since Apr 5, 2016
+ *
+ */
+public class PolicyDefinition implements Serializable{
+    private static final long serialVersionUID = 377581499339572414L;
+    // unique identifier
+    private String name;
+    private String description;
+    private List<String> inputStreams = new ArrayList<String>();
+    private List<String> outputStreams = new ArrayList<String>();
+
+    private Definition definition;
+
+    // one stream only have one partition in one policy, since we don't support stream alias
+    private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
+
+    // runtime configuration for policy, these are user-invisible
+    private int parallelismHint = 1;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public List<String> getInputStreams() {
+        return inputStreams;
+    }
+
+    public void setInputStreams(List<String> inputStreams) {
+        this.inputStreams = inputStreams;
+    }
+
+    public List<String> getOutputStreams() {
+        return outputStreams;
+    }
+
+    public void setOutputStreams(List<String> outputStreams) {
+        this.outputStreams = outputStreams;
+    }
+
+    public Definition getDefinition() {
+        return definition;
+    }
+
+    public void setDefinition(Definition definition) {
+        this.definition = definition;
+    }
+
+    public List<StreamPartition> getPartitionSpec() {
+        return partitionSpec;
+    }
+
+    public void setPartitionSpec(List<StreamPartition> partitionSpec) {
+        this.partitionSpec = partitionSpec;
+    }
+
+    public void addPartition(StreamPartition par) {
+        this.partitionSpec.add(par);
+    }
+
+    public int getParallelismHint() {
+        return parallelismHint;
+    }
+
+    public void setParallelismHint(int parallelism) {
+        this.parallelismHint = parallelism;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().
+                append(name).
+                append(description).
+                append(inputStreams).
+                append(outputStreams).
+                append(definition).
+                append(partitionSpec).
+//                append(parallelismHint).
+                build();
+    }
+
+    @Override
+    public boolean equals(Object that){
+        if(that == this)
+            return true;
+        if(! (that instanceof PolicyDefinition))
+            return false;
+        PolicyDefinition another = (PolicyDefinition)that;
+        if(another.name.equals(this.name) &&
+                another.description.equals(this.description) &&
+                CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) &&
+                CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) &&
+                another.definition.equals(this.definition) &&
+                CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) 
+//                && another.parallelismHint == this.parallelismHint
+                ) {
+            return true;
+        }
+        return false;
+    }
+
+    public static class Definition implements Serializable{
+        private static final long serialVersionUID = -622366527887848346L;
+
+        public String type;
+        public String value;
+
+        public Definition(String type,String value){
+            this.type = type;
+            this.value = value;
+        }
+
+        public Definition() {
+            this.type = null;
+            this.value = null;
+        }
+
+        @Override
+        public int hashCode() {
+            return new HashCodeBuilder().append(type).append(value).build();
+        }
+
+        @Override
+        public boolean equals(Object that){
+            if(that == this)
+                return true;
+            if(!(that instanceof Definition))
+                return false;
+            Definition another = (Definition)that;
+            if(another.type.equals(this.type) &&
+                    another.value.equals(this.value))
+                return true;
+            return false;
+        }
+
+        public String getType() {
+            return type;
+        }
+
+        public void setType(String type) {
+            this.type = type;
+        }
+
+        public String getValue() {
+            return value;
+        }
+
+        public void setValue(String value) {
+            this.value = value;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("{type=\"%s\",value=\"%s\"",type,value);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition().toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
new file mode 100644
index 0000000..d8b4f28
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * @since Apr 11, 2016
+ *
+ */
+public class Publishment {
+
+    private String name;
+    private String type;
+    private List<String> policyIds;
+    private String dedupIntervalMin;
+    private Map<String, String> properties;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public List<String> getPolicyIds() {
+        return policyIds;
+    }
+
+    public void setPolicyIds(List<String> policyIds) {
+        this.policyIds = policyIds;
+    }
+
+    public String getDedupIntervalMin() {
+        return dedupIntervalMin;
+    }
+
+    public void setDedupIntervalMin(String dedupIntervalMin) {
+        this.dedupIntervalMin = dedupIntervalMin;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof Publishment) {
+            Publishment p = (Publishment) obj;
+            return (Objects.equals(name, p.getName()) &&
+                    Objects.equals(type, p.getType()) &&
+                    Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) &&
+                    Objects.equals(policyIds, p.getPolicyIds()) &&
+                    properties.equals(p.getProperties()));
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(name)
+                .append(type)
+                .append(dedupIntervalMin)
+                .append(policyIds)
+                .append(properties)
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
new file mode 100644
index 0000000..f34b971
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+public class PublishmentType {
+
+    private String type;
+    private String className;
+    private String description;
+    private String fields;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getClassName(){
+        return className;
+    }
+    public void setClassName(String className){
+        this.className = className;
+    }
+
+    public String getDescription(){
+        return description;
+    }
+    public void setDescription(String description){
+        this.description = description;
+    }
+
+    public String getFields() {
+        return fields;
+    }
+
+    public void setFields(String fields) {
+        this.fields = fields;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof PublishmentType) {
+            PublishmentType p = (PublishmentType) obj;
+            return (Objects.equals(className, p.getClassName()) &&
+                    Objects.equals(type, p.type) && 
+                    Objects.equals(description, p.getDescription()) &&
+                    Objects.equals(fields, p.getFields()));
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(className)
+                .append(type)
+                .append(description)
+                .append(fields)
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
new file mode 100644
index 0000000..dc44571
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+public class StreamColumn implements Serializable{
+    private static final long serialVersionUID = -5457861313624389106L;
+    private String name;
+    private Type type;
+    private Object defaultValue;
+    private boolean required;
+    private String description;
+
+    public String toString(){
+        return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s]", name, type, defaultValue, required);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Type getType() {
+        return type;
+    }
+
+    public void setType(Type type) {
+        this.type = type;
+    }
+
+    public Object getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(Object defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+
+    public boolean isRequired() {
+        return required;
+    }
+
+    public void setRequired(boolean required) {
+        this.required = required;
+    }
+    
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+
+    public enum Type implements Serializable{
+        STRING("string"), INT("int"), LONG("long"), FLOAT("float"), DOUBLE("double"), BOOL("bool"), OBJECT("object");
+
+        private final String name;
+
+        Type(String name){
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+
+        @JsonCreator
+        public static Type getEnumFromValue(String value) {
+            for (Type testEnum : values()) {
+                if (testEnum.name.equals(value)) {
+                    return testEnum;
+                }
+            }
+            throw new IllegalArgumentException();
+        }
+    }
+
+    public static class Builder {
+        private StreamColumn column;
+
+        public Builder(){
+            column = new StreamColumn();
+        }
+        public Builder name(String name){
+            column.setName(name);
+            return this;
+        }
+        public Builder type(Type type){
+            column.setType(type);
+            return this;
+        }
+        public Builder defaultValue(Object defaultValue){
+            column.setDefaultValue(defaultValue);
+            return this;
+        }
+        public Builder required(boolean required){
+            column.setRequired(required);
+            return this;
+        }
+
+        public StreamColumn build(){
+            return column;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
new file mode 100644
index 0000000..cd5773a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is actually a data source schema.
+ * 
+ * @since Apr 5, 2016
+ *
+ */
+public class StreamDefinition implements Serializable{
+    private static final long serialVersionUID = 2352202882328931825L;
+    private String streamId;
+    private String dataSource;
+    private String description;
+    private boolean validate;
+    private boolean timeseries;
+
+    private List<StreamColumn> columns = new ArrayList<StreamColumn>();
+
+    public String toString(){
+        return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s",
+                streamId,
+                dataSource,
+                description,
+                validate,
+                timeseries,
+                columns);
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public boolean isValidate() {
+        return validate;
+    }
+
+    public void setValidate(boolean validate) {
+        this.validate = validate;
+    }
+
+    public boolean isTimeseries() {
+        return timeseries;
+    }
+
+    public void setTimeseries(boolean timeseries) {
+        this.timeseries = timeseries;
+    }
+
+    public List<StreamColumn> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(List<StreamColumn> columns) {
+        this.columns = columns;
+    }
+
+    public String getDataSource() {
+        return dataSource;
+    }
+
+    public void setDataSource(String dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public int getColumnIndex(String column){
+        int i=0;
+        for(StreamColumn col:this.getColumns()){
+            if(col.getName().equals(column)) return i;
+            i++;
+        }
+        return -1;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
new file mode 100644
index 0000000..7b96024
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Serializable;
+import java.util.*;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * StreamPartition defines how a data stream is partitioned and sorted
+ * streamId is used for distinguishing different streams which are spawned from the same data source
+ * type defines how to partition data among slots within one slotqueue
+ * columns are fields based on which stream is grouped
+ * sortSpec defines how data is sorted
+ */
+public class StreamPartition implements Serializable {
+    private static final long serialVersionUID = -3361648309136926040L;
+
+    private String streamId;
+    private Type type;
+    private List<String> columns = new ArrayList<>();
+    private StreamSortSpec sortSpec;
+
+    public StreamPartition() {
+    }
+
+    public StreamPartition(StreamPartition o) {
+        this.streamId = o.streamId;
+        this.type = o.type;
+        this.columns = new ArrayList<String>(o.columns);
+        this.sortSpec = o.sortSpec == null ? null : new StreamSortSpec(o.sortSpec);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this)
+            return true;
+        if (!(other instanceof StreamPartition)) {
+            return false;
+        }
+        StreamPartition sp = (StreamPartition) other;
+        return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type)
+                && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec);
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(streamId).append(type).append(columns).append(sortSpec).build();
+    }
+
+    public void setType(Type type) {
+        this.type = type;
+    }
+
+    public Type getType(){
+        return this.type;
+    }
+
+    public enum Type{
+        GLOBAL("GLOBAL",0), GROUPBY("GROUPBY",1), SHUFFLE("SHUFFLE",2);
+        private final String name;
+        private final int index;
+        Type(String name, int index){
+            this.name = name;
+            this.index = index;
+        }
+        @Override
+        public String toString() {
+            return this.name;
+        }
+        public static Type locate(String type){
+            Type _type = _NAME_TYPE.get(type.toUpperCase());
+            if(_type == null)
+                throw new IllegalStateException("Illegal type name: "+type);
+            return _type;
+        }
+
+        public static Type locate(int index){
+            Type _type = _INDEX_TYPE.get(index);
+            if(_type == null)
+                throw new IllegalStateException("Illegal type index: "+index);
+            return _type;
+        }
+
+        private static final Map<String,Type> _NAME_TYPE = new HashMap<>();
+        private static final Map<Integer,Type> _INDEX_TYPE = new TreeMap<>();
+        static {
+            _NAME_TYPE.put(GLOBAL.name,GLOBAL);
+            _NAME_TYPE.put(GROUPBY.name,GROUPBY);
+            _NAME_TYPE.put(SHUFFLE.name,SHUFFLE);
+
+            _INDEX_TYPE.put(GLOBAL.index,GLOBAL);
+            _INDEX_TYPE.put(GROUPBY.index,GLOBAL);
+            _INDEX_TYPE.put(SHUFFLE.index,GLOBAL);
+        }
+    }
+
+    public List<String> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(List<String> columns) {
+        this.columns = columns;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public StreamSortSpec getSortSpec() {
+        return sortSpec;
+    }
+
+    public void setSortSpec(StreamSortSpec sortSpec) {
+        this.sortSpec = sortSpec;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]",this.getStreamId(),this.getType(), StringUtils.join(this.getColumns(),","), sortSpec);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
new file mode 100644
index 0000000..ee20f81
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+
+import java.io.Serializable;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+
+/**
+ * streamId is the key
+ */
+public class StreamSortSpec implements Serializable{
+    private static final long serialVersionUID = 3626506441441584937L;
+    private String windowPeriod="";
+    private int windowMargin = 30 * 1000; // 30 seconds by default
+
+    public StreamSortSpec() {}
+
+    public StreamSortSpec(StreamSortSpec spec) {
+        this.windowPeriod = spec.windowPeriod;
+        this.windowMargin = spec.windowMargin;
+    }
+
+    public String getWindowPeriod() {
+        return windowPeriod;
+    }
+
+    public int getWindowPeriodMillis() {
+        if(windowPeriod!=null) {
+            return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
+        } else return 0;
+    }
+
+    public void setWindowPeriod(String windowPeriod) {
+        this.windowPeriod = windowPeriod;
+    }
+    public void setWindowPeriodMillis(int windowPeriodMillis) {
+        this.windowPeriod = Period.millis(windowPeriodMillis).toString();
+    }
+
+    public void setWindowPeriod2(Period period) {
+        this.windowPeriod = period.toString();
+    }
+
+
+    public int getWindowMargin() {
+        return windowMargin;
+    }
+
+    public void setWindowMargin(int windowMargin) {
+        this.windowMargin = windowMargin;
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().
+                append(windowPeriod).
+                append(windowMargin).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object that){
+        if(this == that)
+            return true;
+        if(!(that instanceof StreamSortSpec)){
+            return false;
+        }
+
+        StreamSortSpec another = (StreamSortSpec)that;
+        return 
+                another.windowPeriod.equals(this.windowPeriod) &&
+                another.windowMargin == this.windowMargin;
+    }
+
+    @Override
+    public String toString(){
+        return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
+                this.getWindowPeriod(),
+                this.getWindowMargin());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
new file mode 100644
index 0000000..6cafb16
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * @since Apr 5, 2016
+ *
+ */
+public class StreamingCluster {
+    public static enum StreamingType {
+        STORM
+    }
+
+    @JsonProperty
+    private String name;
+    @JsonProperty
+    private String zone;
+    @JsonProperty
+    private StreamingType type;
+    @JsonProperty
+    private String description;
+    /**
+     * key - nimbus for storm
+     */
+    @JsonProperty
+    private Map<String, String> deployments;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getZone() {
+        return zone;
+    }
+
+    public void setZone(String zone) {
+        this.zone = zone;
+    }
+
+    public StreamingType getType() {
+        return type;
+    }
+
+    public void setType(StreamingType type) {
+        this.type = type;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    public Map<String, String> getDeployments() {
+        return deployments;
+    }
+
+    public void setDeployments(Map<String, String> deployments) {
+        this.deployments = deployments;
+    }
+
+    public static final String NIMBUS_HOST = "nimbusHost";
+    public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
new file mode 100644
index 0000000..f36d3cb
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.utils.DateTimeUtil;
+
+/**
+ * streamId stands for alert type instead of source event streamId
+ */
+public class AlertStreamEvent extends StreamEvent {
+    private static final long serialVersionUID = 2392131134670106397L;
+
+    // TODO: Keep policy name only instead of policy entity
+    private PolicyDefinition policy;
+    private StreamDefinition schema;
+    private String createdBy;
+    private long createdTime;
+
+    public PolicyDefinition getPolicy() {
+        return policy;
+    }
+
+    public void setPolicy(PolicyDefinition policy) {
+        this.policy = policy;
+    }
+
+
+    public String getPolicyId() {
+        return policy.getName();
+    }
+
+    @Override
+    public String toString() {
+        List<String> dataStrings = new ArrayList<>(this.getData().length);
+        for(Object obj: this.getData()){
+            if(obj!=null) {
+                dataStrings.add(obj.toString());
+            }else{
+                dataStrings.add(null);
+            }
+        }
+        return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policy=%s, createdBy=%s]",
+                this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","),this.getPolicy().getName(),this.getCreatedBy());
+    }
+
+    public String getCreatedBy() {
+        return createdBy;
+    }
+
+    public void setCreatedBy(String createdBy) {
+        this.createdBy = createdBy;
+    }
+
+    public StreamDefinition getSchema() {
+        return schema;
+    }
+
+    public void setSchema(StreamDefinition schema) {
+        this.schema = schema;
+    }
+
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
new file mode 100644
index 0000000..cfed3e2
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.model;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+import backtype.storm.tuple.Tuple;
+
+/**
+ * This is a critical data structure across spout, router bolt and alert bolt
+ * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted
+ * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition
+ * event[StreamEvent] is actual data
+ */
+public class PartitionedEvent implements Serializable{
+    private static final long serialVersionUID = -3840016190614238593L;
+    private StreamPartition partition;
+    private long partitionKey;
+    private StreamEvent event;
+
+    /**
+     * Used for bolt-internal but not inter-bolts,
+     * will not pass across bolts
+     */
+    private transient Tuple anchor;
+
+    public PartitionedEvent(){
+        this.event = null;
+        this.partition = null;
+        this.partitionKey = 0L;
+    }
+
+    public PartitionedEvent(StreamEvent event, StreamPartition partition, int partitionKey) {
+        this.event = event;
+        this.partition = partition;
+        this.partitionKey = partitionKey;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if(obj == this) return true;
+        if(obj == null) return false;
+        if(obj instanceof PartitionedEvent){
+            PartitionedEvent another = (PartitionedEvent) obj;
+            return !(this.partitionKey != another.getPartitionKey()
+                    || !Objects.equals(this.event, another.getEvent())
+                    || !Objects.equals(this.partition, another.getPartition())
+                    || !Objects.equals(this.anchor, another.anchor));
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder()
+                .append(partitionKey)
+                .append(event)
+                .append(partition)
+                .build();
+    }
+
+    public StreamEvent getEvent() {
+        return event;
+    }
+
+    public void setEvent(StreamEvent event) {
+        this.event = event;
+    }
+
+    public StreamPartition getPartition() {
+        return partition;
+    }
+
+    public void setPartition(StreamPartition partition) {
+        this.partition = partition;
+    }
+
+    public void setPartitionKey(long partitionKey){
+        this.partitionKey = partitionKey;
+    }
+
+    public long getPartitionKey(){
+        return this.partitionKey;
+    }
+
+    public String toString(){
+       return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event,partitionKey);
+    }
+
+    public long getTimestamp() {
+        return (event != null) ? event.getTimestamp() : 0L;
+    }
+
+    public String getStreamId(){
+        return (event != null) ? event.getStreamId() : null;
+    }
+
+    public Object[] getData(){
+        return event!=null ? event.getData() : null;
+    }
+
+    public boolean isSortRequired(){
+        return isPartitionRequired() && this.getPartition().getSortSpec()!=null;
+    }
+
+    public boolean isPartitionRequired(){
+        return this.getPartition()!=null;
+    }
+
+    public PartitionedEvent copy() {
+        PartitionedEvent copied = new PartitionedEvent();
+        copied.setEvent(this.getEvent());
+        copied.setPartition(this.partition);
+        copied.setPartitionKey(this.partitionKey);
+        return copied;
+    }
+
+    public Tuple getAnchor() {
+        return anchor;
+    }
+
+    public void setAnchor(Tuple anchor) {
+        this.anchor = anchor;
+    }
+
+    public PartitionedEvent withAnchor(Tuple tuple){
+        this.setAnchor(tuple);
+        return this;
+    }
+}
\ No newline at end of file


Mime
View raw message