Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E395A200B7C for ; Thu, 8 Sep 2016 09:14:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E1FD8160ABD; Thu, 8 Sep 2016 07:14:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 93DE7160AA5 for ; Thu, 8 Sep 2016 09:14:37 +0200 (CEST) Received: (qmail 88664 invoked by uid 500); 8 Sep 2016 07:14:36 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 88651 invoked by uid 99); 8 Sep 2016 07:14:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 07:14:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 046D6C034D for ; Thu, 8 Sep 2016 07:14:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id YprCewIsdf5u for ; Thu, 8 Sep 2016 07:14:19 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 88CAF60E31 for ; Thu, 8 Sep 2016 07:14:04 +0000 (UTC) Received: (qmail 86810 invoked by uid 99); 8 Sep 2016 07:14:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Sep 2016 07:14:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3510AE09AC; Thu, 8 Sep 2016 07:14:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.incubator.apache.org Date: Thu, 08 Sep 2016 07:14:18 -0000 Message-Id: <40a77e163df346d6aef5508248fb68ad@git.apache.org> In-Reply-To: <4ff9bd06edd34702805de83004f2559e@git.apache.org> References: <4ff9bd06edd34702805de83004f2559e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/18] incubator-eagle git commit: [EAGLE-530] Fix checkstyle problems on eagle-alert module and enable failOnViolation archived-at: Thu, 08 Sep 2016 07:14:40 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java index 8867fd7..4627bef 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java @@ -16,49 +16,46 @@ */ package org.apache.eagle.alert.coordination.model; +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Strings; - /** * Convert incoming tuple to stream * incoming tuple consists of 2 fields, topic and map of key/value - * output stream consists of 3 fields, stream name, timestamp, and map of key/value + * output stream consists of 3 fields, stream name, timestamp, and map of key/value. */ public class Tuple2StreamConverter { private static final Logger LOG = LoggerFactory.getLogger(Tuple2StreamConverter.class); private Tuple2StreamMetadata metadata; private StreamNameSelector cachedSelector; - public Tuple2StreamConverter(Tuple2StreamMetadata metadata){ + + public Tuple2StreamConverter(Tuple2StreamMetadata metadata) { this.metadata = metadata; try { - cachedSelector = (StreamNameSelector)Class.forName(metadata.getStreamNameSelectorCls()). - getConstructor(Properties.class). - newInstance(metadata.getStreamNameSelectorProp()); - }catch(Exception ex){ + cachedSelector = (StreamNameSelector) Class.forName(metadata.getStreamNameSelectorCls()) + .getConstructor(Properties.class) + .newInstance(metadata.getStreamNameSelectorProp()); + } catch (Exception ex) { LOG.error("error initializing StreamNameSelector object", ex); throw new IllegalStateException(ex); } } /** - * Assume tuple is composed of topic + map of key/value - * @param tuple - * @return + * Assume tuple is composed of topic + map of key/value. */ - @SuppressWarnings({ "unchecked" }) - public List convert(List tuple){ - Map m = (Map)tuple.get(1); + @SuppressWarnings( {"unchecked"}) + public List convert(List tuple) { + Map m = (Map) tuple.get(1); String streamName = cachedSelector.getStreamName(m); - if(!metadata.getActiveStreamNames().contains(streamName)) { - if(LOG.isDebugEnabled()) { + if (!metadata.getActiveStreamNames().contains(streamName)) { + if (LOG.isDebugEnabled()) { LOG.debug("streamName {} is not within activeStreamNames {}", streamName, metadata.getActiveStreamNames()); } return null; @@ -81,14 +78,15 @@ public class Tuple2StreamConverter { LOG.debug("continue with current timestamp becuase no data format sepcified! Metadata : {} ", metadata); } timestamp = System.currentTimeMillis(); - } else - - try { - SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat()); - timestamp = sdf.parse(timestampFieldValue).getTime(); - } catch (Exception ex) { - LOG.error("continue with current timestamp because error happens while parsing timestamp column " + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat()); - timestamp = System.currentTimeMillis(); + } else { + try { + SimpleDateFormat sdf = new SimpleDateFormat(metadata.getTimestampFormat()); + timestamp = sdf.parse(timestampFieldValue).getTime(); + } catch (Exception ex) { + LOG.error("continue with current timestamp because error happens while parsing timestamp column " + + metadata.getTimestampColumn() + " with format " + metadata.getTimestampFormat()); + timestamp = System.currentTimeMillis(); + } } } return Arrays.asList(tuple.get(0), streamName, timestamp, tuple.get(1)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java index bde4fe3..788547d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamMetadata.java @@ -21,21 +21,23 @@ import java.util.Properties; import java.util.Set; /** - * @Since 4/25/16. This metadata controls how tuple is transformed to stream for - * example raw data consists of {"metric" : "cpuUsage", "host" : - * "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric" - * is used for creating stream name, here "cpuUsage" is stream name + * This metadata controls how tuple is transformed to stream for + * example raw data consists of {"metric" : "cpuUsage", "host" : + * "xyz.com", "timestamp" : 1346846400, "value" : "0.9"} field "metric" + * is used for creating stream name, here "cpuUsage" is stream name * - * metric could be "cpuUsage", "diskUsage", "memUsage" etc, so - * activeStreamNames are subset of all metric names + *

metric could be "cpuUsage", "diskUsage", "memUsage" etc, so + * activeStreamNames are subset of all metric names

* - * All other messages which are not one of activeStreamNames will be - * filtered out + *

All other messages which are not one of activeStreamNames will be + * filtered out.

+ * + * @since 4/25/16 */ public class Tuple2StreamMetadata { /** * only messages belonging to activeStreamNames will be kept while - * transforming tuple into stream + * transforming tuple into stream. */ private Set activeStreamNames = new HashSet(); // the specific stream name selector http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java index f4b8ccb..bbd4178 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedPolicyDefinition.java @@ -18,10 +18,6 @@ package org.apache.eagle.alert.coordination.model; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -/** - * @since May 25, 2016 - * - */ public class VersionedPolicyDefinition { private String version; private PolicyDefinition definition; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java index 2770aa1..c9f830b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/VersionedStreamDefinition.java @@ -18,10 +18,6 @@ package org.apache.eagle.alert.coordination.model; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -/** - * @since May 25, 2016 - * - */ public class VersionedStreamDefinition { private String version; private StreamDefinition definition; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java index 3f6f36d..9353dbd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/WorkSlot.java @@ -63,8 +63,8 @@ public class WorkSlot { return false; } WorkSlot workSlot = (WorkSlot) other; - return Objects.equals(topologyName, workSlot.topologyName) && - Objects.equals(boltId, workSlot.boltId); + return Objects.equals(topologyName, workSlot.topologyName) + && Objects.equals(boltId, workSlot.boltId); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java index beda896..e72836e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/MonitoredStream.java @@ -16,22 +16,19 @@ */ package org.apache.eagle.alert.coordination.model.internal; +import org.apache.commons.lang3.builder.HashCodeBuilder; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import org.apache.commons.lang3.builder.HashCodeBuilder; - /** * A monitored stream is the unique data set in the system. - * - * It's a combination of stream and the specific grp-by on it. - * - * For correlation stream, it means multiple stream for a given monitored stream. - * - * - * @since Apr 27, 2016 * + *

It's a combination of stream and the specific grp-by on it. + * + *

For correlation stream, it means multiple stream for a given monitored stream. + * + * @since Apr 27, 2016 */ public class MonitoredStream { @@ -40,7 +37,7 @@ public class MonitoredStream { // the stream group that this monitored stream stands for private StreamGroup streamGroup = new StreamGroup(); private List queues = new ArrayList(); - + public MonitoredStream() { } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java index 3e956ca..7747d58 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/PolicyAssignment.java @@ -17,10 +17,9 @@ package org.apache.eagle.alert.coordination.model.internal; /** - * monitor metadata - * - * @since Apr 27, 2016 + * monitor metadata. * + * @since Apr 27, 2016 */ public class PolicyAssignment { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java index a1efbf9..2462119 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/ScheduleStateBase.java @@ -17,12 +17,9 @@ package org.apache.eagle.alert.coordination.model.internal; /** - * - * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis - * + * This is the Base part of ScheduleState, only contains version/generateTime/code/message/scheduleTimeMillis. * * @since Aug 10, 2016 - * */ public class ScheduleStateBase { private String version; @@ -81,6 +78,4 @@ public class ScheduleStateBase { } - - } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java index d87d62b..7941b85 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamGroup.java @@ -16,26 +16,23 @@ */ package org.apache.eagle.alert.coordination.model.internal; -import java.util.ArrayList; -import java.util.List; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Objects; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.ArrayList; +import java.util.List; -/** - * @since May 6, 2016 - * - */ public class StreamGroup { private List streamPartitions = new ArrayList(); - + public StreamGroup() { } - + public List getStreamPartitions() { return streamPartitions; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java index f4f6142..86b150c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/StreamWorkSlotQueue.java @@ -16,22 +16,20 @@ */ package org.apache.eagle.alert.coordination.model.internal; +import org.apache.eagle.alert.coordination.model.WorkSlot; + +import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.apache.eagle.alert.coordination.model.WorkSlot; - -import com.fasterxml.jackson.annotation.JsonIgnore; - /** * A work queue for given one monitored stream. - * - * Analog to storm's "tasks for given bolt". - * - * @since Apr 27, 2016 * + *

Analog to storm's "tasks for given bolt". + * + * @since Apr 27, 2016 */ public class StreamWorkSlotQueue { private String queueId; @@ -40,15 +38,15 @@ public class StreamWorkSlotQueue { private boolean dedicated; // some dedicated option, like dedicated userId/tenantId/policyId. private Map dedicateOption; - + private int numberOfGroupBolts; - private Map topoGroupStartIndex = new HashMap(); + private Map topoGroupStartIndex = new HashMap(); public StreamWorkSlotQueue() { } - + public StreamWorkSlotQueue(StreamGroup par, boolean isDedicated, Map options, - List slots) { + List slots) { this.queueId = par.getStreamId() + System.currentTimeMillis();// simply generate a queue this.dedicated = isDedicated; dedicateOption = new HashMap(); @@ -81,11 +79,11 @@ public class StreamWorkSlotQueue { return workingSlots.size(); } -// @org.codehaus.jackson.annotate.JsonIgnore -// @JsonIgnore -// public void placePolicy(PolicyDefinition pd) { -// policies.add(pd.getName()); -// } + // @org.codehaus.jackson.annotate.JsonIgnore + // @JsonIgnore + // public void placePolicy(PolicyDefinition pd) { + // policies.add(pd.getName()); + // } public int getNumberOfGroupBolts() { return numberOfGroupBolts; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java index 189e2a5..c41c867 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java @@ -20,10 +20,12 @@ import java.util.HashSet; import java.util.Set; /** - * @since Mar 24, 2016 Logically one unit topology consists of S spouts, G - * groupby bolts, A alertBolts normally S=1 Physically each spout is - * composed of s spout nodes, each groupby bolt is composed of g groupby - * nodes, and each alert bolt is composed of a alert nodes + * Logically one unit topology consists of S spouts, G + * groupby bolts, A alertBolts normally S=1 Physically each spout is + * composed of s spout nodes, each groupby bolt is composed of g groupby + * nodes, and each alert bolt is composed of a alert nodes. + * + * @since Mar 24, 2016 */ public class Topology { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java index dd26d20..ac375e1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IEventSerializer.java @@ -19,8 +19,7 @@ package org.apache.eagle.alert.engine.codec; import org.apache.eagle.alert.engine.model.AlertStreamEvent; /** - * @since Jun 3, 2016 - * + * @since Jun 3, 2016. */ public interface IEventSerializer { Object serialize(AlertStreamEvent event); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index c54955f..680b21a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -25,11 +25,10 @@ import java.io.Serializable; import java.util.*; /** - * @since Apr 5, 2016 - * + * @since Apr 5, 2016. */ @JsonIgnoreProperties(ignoreUnknown = true) -public class PolicyDefinition implements Serializable{ +public class PolicyDefinition implements Serializable { private static final long serialVersionUID = 377581499339572414L; // unique identifier private String name; @@ -116,48 +115,50 @@ public class PolicyDefinition implements Serializable{ } public PolicyStatus getPolicyStatus() { - return policyStatus; - } + return policyStatus; + } - public void setPolicyStatus(PolicyStatus policyStatus) { - this.policyStatus = policyStatus; - } + public void setPolicyStatus(PolicyStatus policyStatus) { + this.policyStatus = policyStatus; + } - @Override + @Override public int hashCode() { - return new HashCodeBuilder(). - append(name). - append(inputStreams). - append(outputStreams). - append(definition). - append(partitionSpec). -// append(parallelismHint). - build(); + return new HashCodeBuilder() + .append(name) + .append(inputStreams) + .append(outputStreams) + .append(definition) + .append(partitionSpec) + // .append(parallelismHint) + .build(); } @Override - public boolean equals(Object that){ - if(that == this) + public boolean equals(Object that) { + if (that == this) { return true; - if(! (that instanceof PolicyDefinition)) + } + if (!(that instanceof PolicyDefinition)) { return false; - PolicyDefinition another = (PolicyDefinition)that; - if(Objects.equals(another.name, this.name) && - Objects.equals(another.description, this.description) && - CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) && - CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) && - another.definition.equals(this.definition) && - Objects.equals(this.definition, another.definition) && - CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) -// && another.parallelismHint == this.parallelismHint - ) { + } + PolicyDefinition another = (PolicyDefinition) that; + if (Objects.equals(another.name, this.name) + && Objects.equals(another.description, this.description) + && CollectionUtils.isEqualCollection(another.inputStreams, this.inputStreams) + && CollectionUtils.isEqualCollection(another.outputStreams, this.outputStreams) + && another.definition.equals(this.definition) + && Objects.equals(this.definition, another.definition) + && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) + // && another.parallelismHint == this.parallelismHint + ) { return true; } return false; } @JsonIgnoreProperties(ignoreUnknown = true) - public static class Definition implements Serializable{ + public static class Definition implements Serializable { private static final long serialVersionUID = -622366527887848346L; public String type; @@ -168,7 +169,7 @@ public class PolicyDefinition implements Serializable{ private List inputStreams = new ArrayList(); private List outputStreams = new ArrayList(); - public Definition(String type,String value){ + public Definition(String type, String value) { this.type = type; this.value = value; } @@ -184,17 +185,20 @@ public class PolicyDefinition implements Serializable{ } @Override - public boolean equals(Object that){ - if(that == this) + public boolean equals(Object that) { + if (that == this) { return true; - if(!(that instanceof Definition)) + } + if (!(that instanceof Definition)) { return false; - Definition another = (Definition)that; - if(another.type.equals(this.type) - && another.value.equals(this.value) - && ListUtils.isEqualList(another.inputStreams, this.inputStreams) - && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) + } + Definition another = (Definition) that; + if (another.type.equals(this.type) + && another.value.equals(this.value) + && ListUtils.isEqualList(another.inputStreams, this.inputStreams) + && ListUtils.isEqualList(another.outputStreams, this.outputStreams)) { return true; + } return false; } @@ -248,16 +252,16 @@ public class PolicyDefinition implements Serializable{ @Override public String toString() { - return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }",type,value, inputStreams, outputStreams); + return String.format("{type=\"%s\",value=\"%s\", inputStreams=\"%s\", outputStreams=\"%s\" }", type, value, inputStreams, outputStreams); } } - + public static enum PolicyStatus { - ENABLED, DISABLED + ENABLED, DISABLED } @Override public String toString() { - return String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition()==null?"null": this.getDefinition().toString()); + return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString()); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java index e3b4e33..0bada4e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java @@ -24,8 +24,7 @@ import java.util.Map; import java.util.Objects; /** - * @since Apr 11, 2016 - * + * @since Apr 11, 2016. */ @JsonIgnoreProperties(ignoreUnknown = true) public class Publishment { @@ -100,8 +99,8 @@ public class Publishment { if (obj instanceof Publishment) { Publishment p = (Publishment) obj; return (Objects.equals(name, p.getName()) && Objects.equals(type, p.getType()) - && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) - && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties())); + && Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) + && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties())); } return false; } @@ -109,14 +108,14 @@ public class Publishment { @Override public int hashCode() { return new HashCodeBuilder().append(name).append(type).append(dedupIntervalMin).append(policyIds) - .append(properties).build(); + .append(properties).build(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:") - .append(policyIds).append(",properties:").append(properties); + .append(policyIds).append(",properties:").append(properties); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index daecab4..5329dfa 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -18,11 +18,11 @@ package org.apache.eagle.alert.engine.coordinator; -import java.util.Objects; - import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang3.builder.HashCodeBuilder; +import java.util.Objects; + @JsonIgnoreProperties(ignoreUnknown = true) public class PublishmentType { @@ -39,17 +39,19 @@ public class PublishmentType { this.type = type; } - public String getClassName(){ + public String getClassName() { return className; } - public void setClassName(String className){ + + public void setClassName(String className) { this.className = className; } - public String getDescription(){ + public String getDescription() { return description; } - public void setDescription(String description){ + + public void setDescription(String description) { this.description = description; } @@ -65,10 +67,10 @@ public class PublishmentType { public boolean equals(Object obj) { if (obj instanceof PublishmentType) { PublishmentType p = (PublishmentType) obj; - return (Objects.equals(className, p.getClassName()) && - Objects.equals(type, p.type) && - Objects.equals(description, p.getDescription()) && - Objects.equals(fields, p.getFields())); + return (Objects.equals(className, p.getClassName()) + && Objects.equals(type, p.type) + && Objects.equals(description, p.getDescription()) + && Objects.equals(fields, p.getFields())); } return false; } @@ -76,10 +78,10 @@ public class PublishmentType { @Override public int hashCode() { return new HashCodeBuilder() - .append(className) - .append(type) - .append(description) - .append(fields) - .build(); + .append(className) + .append(type) + .append(description) + .append(fields) + .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java index 4483fe4..2be4936 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java @@ -16,16 +16,14 @@ */ package org.apache.eagle.alert.engine.coordinator; +import com.fasterxml.jackson.databind.ObjectMapper; + import java.io.IOException; import java.io.Serializable; import java.util.HashMap; - import javax.xml.bind.annotation.adapters.XmlAdapter; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; -import com.fasterxml.jackson.databind.ObjectMapper; - - public class StreamColumn implements Serializable { private static final long serialVersionUID = -5457861313624389106L; private String name; @@ -36,19 +34,19 @@ public class StreamColumn implements Serializable { private String nodataExpression; public String toString() { - return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]", - name, type, defaultValue, required, nodataExpression); + return String.format("StreamColumn=name[%s], type=[%s], defaultValue=[%s], required=[%s], nodataExpression=[%s]", + name, type, defaultValue, required, nodataExpression); } public String getNodataExpression() { - return nodataExpression; - } + return nodataExpression; + } - public void setNodataExpression(String nodataExpression) { - this.nodataExpression = nodataExpression; - } + public void setNodataExpression(String nodataExpression) { + this.nodataExpression = nodataExpression; + } - public String getName() { + public String getName() { return name; } @@ -71,7 +69,7 @@ public class StreamColumn implements Serializable { } private void ensureDefaultValueType() { - if(this.getDefaultValue()!=null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING){ + if (this.getDefaultValue() != null && (this.getDefaultValue() instanceof String) && this.getType() != Type.STRING) { switch (this.getType()) { case INT: this.setDefaultValue(Integer.valueOf((String) this.getDefaultValue())); @@ -90,11 +88,13 @@ public class StreamColumn implements Serializable { break; case OBJECT: try { - this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(),HashMap.class)); + this.setDefaultValue(new ObjectMapper().readValue((String) this.getDefaultValue(), HashMap.class)); } catch (IOException e) { throw new IllegalArgumentException(e); } break; + default: + throw new IllegalArgumentException("Illegal type: " + this.getType()); } } } @@ -145,7 +145,7 @@ public class StreamColumn implements Serializable { } } - public static class StreamColumnTypeAdapter extends XmlAdapter{ + public static class StreamColumnTypeAdapter extends XmlAdapter { @Override public Type unmarshal(String v) throws Exception { @@ -158,7 +158,7 @@ public class StreamColumn implements Serializable { } } - public static class DefaultValueAdapter extends XmlAdapter{ + public static class DefaultValueAdapter extends XmlAdapter { @Override public Object unmarshal(String v) throws Exception { return v; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java index beb8491..9130951 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java @@ -16,18 +16,18 @@ */ package org.apache.eagle.alert.engine.coordinator; -import javax.xml.bind.annotation.*; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; /** * This is actually a data source schema. - * - * @since Apr 5, 2016 * + * @since Apr 5, 2016 */ -public class StreamDefinition implements Serializable{ +public class StreamDefinition implements Serializable { private static final long serialVersionUID = 2352202882328931825L; private String streamId; private String dataSource; @@ -37,14 +37,14 @@ public class StreamDefinition implements Serializable{ private List columns = new ArrayList<>(); - public String toString(){ + public String toString() { return String.format("StreamDefinition[streamId=%s, dataSource=%s, description=%s, validate=%s, timeseries=%s, columns=%s", - streamId, - dataSource, - description, - validate, - timeseries, - columns); + streamId, + dataSource, + description, + validate, + timeseries, + columns); } public String getStreamId() { @@ -79,7 +79,7 @@ public class StreamDefinition implements Serializable{ this.timeseries = timeseries; } - @XmlElementWrapper(name="columns") + @XmlElementWrapper(name = "columns") @XmlElement(name = "column") public List getColumns() { return columns; @@ -97,10 +97,12 @@ public class StreamDefinition implements Serializable{ this.dataSource = dataSource; } - public int getColumnIndex(String column){ - int i=0; - for(StreamColumn col:this.getColumns()){ - if(col.getName().equals(column)) return i; + public int getColumnIndex(String column) { + int i = 0; + for (StreamColumn col : this.getColumns()) { + if (col.getName().equals(column)) { + return i; + } i++; } return -1; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java index 47e15c0..0987463 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java @@ -16,20 +16,19 @@ */ package org.apache.eagle.alert.engine.coordinator; -import java.io.Serializable; -import java.util.*; - import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; +import java.io.Serializable; +import java.util.*; /** * StreamPartition defines how a data stream is partitioned and sorted * streamId is used for distinguishing different streams which are spawned from the same data source * type defines how to partition data among slots within one slotqueue * columns are fields based on which stream is grouped - * sortSpec defines how data is sorted + * sortSpec defines how data is sorted. */ @JsonIgnoreProperties(ignoreUnknown = true) public class StreamPartition implements Serializable { @@ -52,14 +51,15 @@ public class StreamPartition implements Serializable { @Override public boolean equals(Object other) { - if (other == this) + if (other == this) { return true; + } if (!(other instanceof StreamPartition)) { return false; } StreamPartition sp = (StreamPartition) other; return Objects.equals(streamId, sp.streamId) && Objects.equals(type, sp.type) - && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec); + && CollectionUtils.isEqualCollection(columns, sp.columns) && Objects.equals(sortSpec, sp.sortSpec); } @Override @@ -71,46 +71,52 @@ public class StreamPartition implements Serializable { this.type = type; } - public Type getType(){ + public Type getType() { return this.type; } - public enum Type{ - GLOBAL("GLOBAL",0), GROUPBY("GROUPBY",1), SHUFFLE("SHUFFLE",2); + public enum Type { + GLOBAL("GLOBAL", 0), GROUPBY("GROUPBY", 1), SHUFFLE("SHUFFLE", 2); private final String name; private final int index; - Type(String name, int index){ + + Type(String name, int index) { this.name = name; this.index = index; } + @Override public String toString() { return this.name; } - public static Type locate(String type){ + + public static Type locate(String type) { Type _type = _NAME_TYPE.get(type.toUpperCase()); - if(_type == null) - throw new IllegalStateException("Illegal type name: "+type); + if (_type == null) { + throw new IllegalStateException("Illegal type name: " + type); + } return _type; } - public static Type locate(int index){ + public static Type locate(int index) { Type _type = _INDEX_TYPE.get(index); - if(_type == null) - throw new IllegalStateException("Illegal type index: "+index); + if (_type == null) { + throw new IllegalStateException("Illegal type index: " + index); + } return _type; } - private static final Map _NAME_TYPE = new HashMap<>(); - private static final Map _INDEX_TYPE = new TreeMap<>(); + private static final Map _NAME_TYPE = new HashMap<>(); + private static final Map _INDEX_TYPE = new TreeMap<>(); + static { - _NAME_TYPE.put(GLOBAL.name,GLOBAL); - _NAME_TYPE.put(GROUPBY.name,GROUPBY); - _NAME_TYPE.put(SHUFFLE.name,SHUFFLE); + _NAME_TYPE.put(GLOBAL.name, GLOBAL); + _NAME_TYPE.put(GROUPBY.name, GROUPBY); + _NAME_TYPE.put(SHUFFLE.name, SHUFFLE); - _INDEX_TYPE.put(GLOBAL.index,GLOBAL); - _INDEX_TYPE.put(GROUPBY.index,GLOBAL); - _INDEX_TYPE.put(SHUFFLE.index,GLOBAL); + _INDEX_TYPE.put(GLOBAL.index, GLOBAL); + _INDEX_TYPE.put(GROUPBY.index, GLOBAL); + _INDEX_TYPE.put(SHUFFLE.index, GLOBAL); } } @@ -140,6 +146,6 @@ public class StreamPartition implements Serializable { @Override public String toString() { - return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]",this.getStreamId(),this.getType(), StringUtils.join(this.getColumns(),","), sortSpec); + return String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]", this.getStreamId(), this.getType(), StringUtils.join(this.getColumns(), ","), sortSpec); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java index 962a8ee..65b9151 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java @@ -16,24 +16,24 @@ */ package org.apache.eagle.alert.engine.coordinator; - +import org.apache.eagle.alert.utils.TimePeriodUtils; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.eagle.alert.utils.TimePeriodUtils; import org.joda.time.Period; import java.io.Serializable; /** - * streamId is the key + * streamId is the key. */ @JsonIgnoreProperties(ignoreUnknown = true) -public class StreamSortSpec implements Serializable{ +public class StreamSortSpec implements Serializable { private static final long serialVersionUID = 3626506441441584937L; - private String windowPeriod=""; + private String windowPeriod = ""; private int windowMargin = 30 * 1000; // 30 seconds by default - public StreamSortSpec() {} + public StreamSortSpec() { + } public StreamSortSpec(StreamSortSpec spec) { this.windowPeriod = spec.windowPeriod; @@ -45,14 +45,17 @@ public class StreamSortSpec implements Serializable{ } public int getWindowPeriodMillis() { - if(windowPeriod!=null) { + if (windowPeriod != null) { return TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod)); - } else return 0; + } else { + return 0; + } } public void setWindowPeriod(String windowPeriod) { this.windowPeriod = windowPeriod; } + public void setWindowPeriodMillis(int windowPeriodMillis) { this.windowPeriod = Period.millis(windowPeriodMillis).toString(); } @@ -71,30 +74,32 @@ public class StreamSortSpec implements Serializable{ } @Override - public int hashCode(){ - return new HashCodeBuilder(). - append(windowPeriod). - append(windowMargin).toHashCode(); + public int hashCode() { + return new HashCodeBuilder() + .append(windowPeriod) + .append(windowMargin) + .toHashCode(); } @Override - public boolean equals(Object that){ - if(this == that) + public boolean equals(Object that) { + if (this == that) { return true; - if(!(that instanceof StreamSortSpec)){ + } + if (!(that instanceof StreamSortSpec)) { return false; } - StreamSortSpec another = (StreamSortSpec)that; - return - another.windowPeriod.equals(this.windowPeriod) && - another.windowMargin == this.windowMargin; + StreamSortSpec another = (StreamSortSpec) that; + return + another.windowPeriod.equals(this.windowPeriod) + && another.windowMargin == this.windowMargin; } @Override - public String toString(){ + public String toString() { return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]", - this.getWindowPeriod(), - this.getWindowMargin()); + this.getWindowPeriod(), + this.getWindowMargin()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java index 6cafb16..1e40309 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java @@ -16,13 +16,11 @@ */ package org.apache.eagle.alert.engine.coordinator; -import java.util.Map; - import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; /** - * @since Apr 5, 2016 - * + * @since Apr 5, 2016. */ public class StreamingCluster { public static enum StreamingType { @@ -38,7 +36,7 @@ public class StreamingCluster { @JsonProperty private String description; /** - * key - nimbus for storm + * key - nimbus for storm. */ @JsonProperty private Map deployments; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java index 13881a1..a503dcf 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java @@ -16,15 +16,14 @@ */ package org.apache.eagle.alert.engine.model; -import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.utils.DateTimeUtil; - +import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.List; /** - * streamId stands for alert type instead of source event streamId + * streamId stands for alert type instead of source event streamId. */ public class AlertStreamEvent extends StreamEvent { private static final long serialVersionUID = 2392131134670106397L; @@ -45,15 +44,16 @@ public class AlertStreamEvent extends StreamEvent { @Override public String toString() { List dataStrings = new ArrayList<>(this.getData().length); - for(Object obj: this.getData()){ - if(obj!=null) { + for (Object obj : this.getData()) { + if (obj != null) { dataStrings.add(obj.toString()); - }else{ + } else { dataStrings.add(null); } } return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, createdBy=%s, metaVersion=%s]", - this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","),this.getPolicyId(),this.getCreatedBy(),this.getMetaVersion()); + this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), + StringUtils.join(dataStrings, ","), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion()); } public String getCreatedBy() { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java index cfed3e2..51e4532 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java @@ -16,21 +16,19 @@ */ package org.apache.eagle.alert.engine.model; -import java.io.Serializable; -import java.util.Objects; - -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.alert.engine.coordinator.StreamPartition; - import backtype.storm.tuple.Tuple; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import java.io.Serializable; +import java.util.Objects; /** * This is a critical data structure across spout, router bolt and alert bolt * partition[StreamPartition] defines how one incoming data stream is partitioned, sorted * partitionKey[long] is java hash value of groupby fields. The groupby fields are defined in StreamPartition - * event[StreamEvent] is actual data + * event[StreamEvent] is actual data. */ -public class PartitionedEvent implements Serializable{ +public class PartitionedEvent implements Serializable { private static final long serialVersionUID = -3840016190614238593L; private StreamPartition partition; private long partitionKey; @@ -38,11 +36,11 @@ public class PartitionedEvent implements Serializable{ /** * Used for bolt-internal but not inter-bolts, - * will not pass across bolts + * will not pass across bolts. */ private transient Tuple anchor; - public PartitionedEvent(){ + public PartitionedEvent() { this.event = null; this.partition = null; this.partitionKey = 0L; @@ -56,14 +54,18 @@ public class PartitionedEvent implements Serializable{ @Override public boolean equals(Object obj) { - if(obj == this) return true; - if(obj == null) return false; - if(obj instanceof PartitionedEvent){ + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (obj instanceof PartitionedEvent) { PartitionedEvent another = (PartitionedEvent) obj; return !(this.partitionKey != another.getPartitionKey() - || !Objects.equals(this.event, another.getEvent()) - || !Objects.equals(this.partition, another.getPartition()) - || !Objects.equals(this.anchor, another.anchor)); + || !Objects.equals(this.event, another.getEvent()) + || !Objects.equals(this.partition, another.getPartition()) + || !Objects.equals(this.anchor, another.anchor)); } else { return false; } @@ -72,10 +74,10 @@ public class PartitionedEvent implements Serializable{ @Override public int hashCode() { return new HashCodeBuilder() - .append(partitionKey) - .append(event) - .append(partition) - .build(); + .append(partitionKey) + .append(event) + .append(partition) + .build(); } public StreamEvent getEvent() { @@ -94,36 +96,36 @@ public class PartitionedEvent implements Serializable{ this.partition = partition; } - public void setPartitionKey(long partitionKey){ + public void setPartitionKey(long partitionKey) { this.partitionKey = partitionKey; } - public long getPartitionKey(){ + public long getPartitionKey() { return this.partitionKey; } - public String toString(){ - return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event,partitionKey); + public String toString() { + return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", partition, event, partitionKey); } public long getTimestamp() { return (event != null) ? event.getTimestamp() : 0L; } - public String getStreamId(){ + public String getStreamId() { return (event != null) ? event.getStreamId() : null; } - public Object[] getData(){ - return event!=null ? event.getData() : null; + public Object[] getData() { + return event != null ? event.getData() : null; } - public boolean isSortRequired(){ - return isPartitionRequired() && this.getPartition().getSortSpec()!=null; + public boolean isSortRequired() { + return isPartitionRequired() && this.getPartition().getSortSpec() != null; } - public boolean isPartitionRequired(){ - return this.getPartition()!=null; + public boolean isPartitionRequired() { + return this.getPartition() != null; } public PartitionedEvent copy() { @@ -142,7 +144,7 @@ public class PartitionedEvent implements Serializable{ this.anchor = anchor; } - public PartitionedEvent withAnchor(Tuple tuple){ + public PartitionedEvent withAnchor(Tuple tuple) { this.setAnchor(tuple); return this; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java index 5f59b1e..d91b001 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java @@ -16,11 +16,10 @@ */ package org.apache.eagle.alert.engine.model; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.utils.DateTimeUtil; - +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; @@ -28,8 +27,7 @@ import java.util.List; import java.util.Objects; /** - * @since Apr 5, 2016 - * + * @since Apr 5, 2016. */ public class StreamEvent implements Serializable { private static final long serialVersionUID = 2765116509856609763L; @@ -39,15 +37,16 @@ public class StreamEvent implements Serializable { private long timestamp; private String metaVersion; - public StreamEvent(){} + public StreamEvent() { + } - public StreamEvent(String streamId,long timestamp,Object[] data){ + public StreamEvent(String streamId, long timestamp, Object[] data) { this.setStreamId(streamId); this.setTimestamp(timestamp); this.setData(data); } - public StreamEvent(String streamId,long timestamp,Object[] data,String metaVersion){ + public StreamEvent(String streamId, long timestamp, Object[] data, String metaVersion) { this.setStreamId(streamId); this.setTimestamp(timestamp); this.setData(data); @@ -62,9 +61,6 @@ public class StreamEvent implements Serializable { this.streamId = streamId; } - public Object[] getData() { - return data; - } public void setData(Object[] data) { this.data = data; @@ -93,10 +89,12 @@ public class StreamEvent implements Serializable { @Override public boolean equals(Object obj) { - if(obj == this) return true; - if(obj instanceof StreamEvent){ + if (obj == this) { + return true; + } + if (obj instanceof StreamEvent) { StreamEvent another = (StreamEvent) obj; - return Objects.equals(this.streamId,another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data,another.data); + return Objects.equals(this.streamId, another.streamId) && this.timestamp == another.timestamp && Arrays.deepEquals(this.data, another.data); } return false; } @@ -104,7 +102,7 @@ public class StreamEvent implements Serializable { @Override public String toString() { List dataStrings = new ArrayList<>(); - if(this.getData() != null) { + if (this.getData() != null) { for (Object obj : this.getData()) { if (obj != null) { dataStrings.add(obj.toString()); @@ -113,17 +111,21 @@ public class StreamEvent implements Serializable { } } } - return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]",this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), StringUtils.join(dataStrings,","), this.getMetaVersion()); + return String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s],metaVersion=%s]", + this.getStreamId(), + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), + StringUtils.join(dataStrings, ","), + this.getMetaVersion()); } - public static StreamEventBuilder Builder(){ + public static StreamEventBuilder builder() { return new StreamEventBuilder(); } /** - * @return cloned new event object + * @return cloned new event object. */ - public StreamEvent copy(){ + public StreamEvent copy() { StreamEvent newEvent = new StreamEvent(); newEvent.setTimestamp(this.getTimestamp()); newEvent.setData(this.getData()); @@ -132,19 +134,18 @@ public class StreamEvent implements Serializable { return newEvent; } - public void copyFrom(StreamEvent event){ + public void copyFrom(StreamEvent event) { this.setTimestamp(event.getTimestamp()); this.setData(event.getData()); this.setStreamId(event.getStreamId()); this.setMetaVersion(event.getMetaVersion()); } - /** - * @param column - * @param streamDefinition - * @return - */ - public Object[] getData(StreamDefinition streamDefinition,List column) { + public Object[] getData() { + return data; + } + + public Object[] getData(StreamDefinition streamDefinition, List column) { ArrayList result = new ArrayList<>(column.size()); for (String colName : column) { result.add(this.getData()[streamDefinition.getColumnIndex(colName)]); @@ -152,7 +153,7 @@ public class StreamEvent implements Serializable { return result.toArray(); } - public Object[] getData(StreamDefinition streamDefinition,String ... column) { + public Object[] getData(StreamDefinition streamDefinition, String... column) { ArrayList result = new ArrayList<>(column.length); for (String colName : column) { result.add(this.getData()[streamDefinition.getColumnIndex(colName)]); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java index 1036ba2..53101ef 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java @@ -25,62 +25,65 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -public class StreamEventBuilder{ - private final static Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class); +public class StreamEventBuilder { + private static final Logger LOG = LoggerFactory.getLogger(StreamEventBuilder.class); private StreamEvent instance; private StreamDefinition streamDefinition; - public StreamEventBuilder(){ + + public StreamEventBuilder() { instance = new StreamEvent(); } - public StreamEventBuilder schema(StreamDefinition streamDefinition){ + public StreamEventBuilder schema(StreamDefinition streamDefinition) { this.streamDefinition = streamDefinition; - if(instance.getStreamId() == null) instance.setStreamId(streamDefinition.getStreamId()); + if (instance.getStreamId() == null) { + instance.setStreamId(streamDefinition.getStreamId()); + } return this; } - public StreamEventBuilder streamId(String streamId){ + public StreamEventBuilder streamId(String streamId) { instance.setStreamId(streamId); return this; } - public StreamEventBuilder attributes(Map data, StreamDefinition streamDefinition){ + public StreamEventBuilder attributes(Map data, StreamDefinition streamDefinition) { this.schema(streamDefinition); List columnList = streamDefinition.getColumns(); - if(columnList!=null && columnList.size() > 0){ + if (columnList != null && columnList.size() > 0) { List values = new ArrayList<>(columnList.size()); for (StreamColumn column : columnList) { - values.add(data.getOrDefault(column.getName(),column.getDefaultValue())); + values.add(data.getOrDefault(column.getName(), column.getDefaultValue())); } instance.setData(values.toArray()); - } else if(LOG.isDebugEnabled()){ - LOG.warn("All data [{}] are ignored as no columns defined in schema {}",data,streamDefinition); + } else if (LOG.isDebugEnabled()) { + LOG.warn("All data [{}] are ignored as no columns defined in schema {}", data, streamDefinition); } return this; } - public StreamEventBuilder attributes(Map data){ - return attributes(data,this.streamDefinition); + public StreamEventBuilder attributes(Map data) { + return attributes(data, this.streamDefinition); } - public StreamEventBuilder attributes(Object ... data){ + public StreamEventBuilder attributes(Object... data) { instance.setData(data); return this; } - public StreamEventBuilder timestamep(long timestamp){ + public StreamEventBuilder timestamep(long timestamp) { instance.setTimestamp(timestamp); return this; } - public StreamEventBuilder metaVersion(String metaVersion){ + public StreamEventBuilder metaVersion(String metaVersion) { instance.setMetaVersion(metaVersion); return this; } - public StreamEvent build(){ - if(instance.getStreamId() == null){ + public StreamEvent build() { + if (instance.getStreamId() == null) { throw new IllegalArgumentException("streamId is null of event: " + instance); } return instance; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java index 06a99f4..461a23c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java @@ -1,14 +1,4 @@ -package org.apache.eagle.alert.metric; - -import java.util.Map; - -import org.apache.eagle.alert.metric.sink.MetricSink; -import org.apache.eagle.alert.metric.source.MetricSource; - -import com.codahale.metrics.MetricRegistry; -import com.typesafe.config.Config; - -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -24,41 +14,48 @@ import com.typesafe.config.Config; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.eagle.alert.metric; + +import org.apache.eagle.alert.metric.sink.MetricSink; +import org.apache.eagle.alert.metric.source.MetricSource; + +import com.codahale.metrics.MetricRegistry; +import com.typesafe.config.Config; +import java.util.Map; + public interface IMetricSystem { /** - * Initialize + * Initialize. */ void start(); /** - * Schedule reporter + * Schedule reporter. */ void schedule(); /** - * Close and stop all resources and services + * Close and stop all resources and services. */ void stop(); /** - * Manual report metric + * Manual report metric. */ void report(); /** - * - * @param sink metric sink + * @param sink metric sink. */ - void register(MetricSink sink,Config config); + void register(MetricSink sink, Config config); /** - * - * @param source metric source + * @param source metric source. */ void register(MetricSource source); - void tags(Map metricTags); + void tags(Map metricTags); MetricRegistry registry(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java index b91c606..555c4ec 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,74 +16,74 @@ */ package org.apache.eagle.alert.metric; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import org.apache.eagle.alert.metric.sink.MetricSink; import org.apache.eagle.alert.metric.sink.MetricSinkRepository; import org.apache.eagle.alert.metric.source.MetricSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.codahale.metrics.MetricRegistry; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; public class MetricSystem implements IMetricSystem { private final Config config; - private Map sinks = new HashMap<>(); -// private Map sources = new HashMap<>(); + private Map sinks = new HashMap<>(); + // private Map sources = new HashMap<>(); private MetricRegistry registry = new MetricRegistry(); private boolean running; private boolean initialized; - private final static Logger LOG = LoggerFactory.getLogger(MetricSystem.class); + private static final Logger LOG = LoggerFactory.getLogger(MetricSystem.class); private final Map metricTags = new HashMap<>(); - public MetricSystem(Config config){ + public MetricSystem(Config config) { this.config = config; } - public static MetricSystem load(Config config){ + public static MetricSystem load(Config config) { MetricSystem instance = new MetricSystem(config); instance.loadFromConfig(); return instance; } @Override - public void tags(Map metricTags){ + public void tags(Map metricTags) { this.metricTags.putAll(metricTags); } @Override public void start() { - if(initialized) + if (initialized) { throw new IllegalStateException("Attempting to initialize a MetricsSystem that is already intialized"); - sinks.forEach((sink,conf) -> sink.prepare(conf.withValue("tags",ConfigFactory.parseMap(metricTags).root()),registry)); + } + sinks.forEach((sink, conf) -> sink.prepare(conf.withValue("tags", ConfigFactory.parseMap(metricTags).root()), registry)); initialized = true; } @Override public void schedule() { - if(running){ - throw new IllegalStateException("Attempting to start a MetricsSystem that is already running"); + if (running) { + throw new IllegalStateException("Attempting to start a MetricsSystem that is already running"); } - sinks.keySet().forEach((sink)->sink.start(5, TimeUnit.SECONDS)); + sinks.keySet().forEach((sink) -> sink.start(5, TimeUnit.SECONDS)); running = true; } - public void loadFromConfig(){ + public void loadFromConfig() { loadSinksFromConfig(); } - private void loadSinksFromConfig(){ + private void loadSinksFromConfig() { Config sinkCls = config.hasPath("metric.sink") ? config.getConfig("metric.sink") : null; - if(sinkCls == null){ + if (sinkCls == null) { // do nothing - }else{ - for(String sinkType:sinkCls.root().unwrapped().keySet()){ - register(MetricSinkRepository.createSink(sinkType),config.getConfig("metric.sink."+sinkType)); + } else { + for (String sinkType : sinkCls.root().unwrapped().keySet()) { + register(MetricSinkRepository.createSink(sinkType), config.getConfig("metric.sink." + sinkType)); } } } @@ -99,9 +99,9 @@ public class MetricSystem implements IMetricSystem { } @Override - public void register(MetricSink sink,Config config) { - LOG.debug("Register {}",sink); - sinks.put(sink,config); + public void register(MetricSink sink, Config config) { + LOG.debug("Register {}", sink); + sinks.put(sink, config); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a0fc8708/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java index b5e6c63..f1262c7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java @@ -16,55 +16,54 @@ */ package org.apache.eagle.alert.metric.entity; -import java.util.Map; -import java.util.TreeMap; - import org.apache.eagle.alert.utils.DateTimeUtil; - import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; +import java.util.Map; +import java.util.TreeMap; -public class MetricEvent extends TreeMap{ +public class MetricEvent extends TreeMap { private static final long serialVersionUID = 6862373651636342744L; - public static Builder of(String name){ + public static Builder of(String name) { return new Builder(name); } /** - * TODO: Refactor according to ConsoleReporter + * TODO: Refactor according to ConsoleReporter. */ - public static class Builder{ + public static class Builder { private final String name; private MetricEvent instance; - public Builder(String name){ + + public Builder(String name) { this.instance = new MetricEvent(); this.name = name; } - public Builder from(Counter value) { -// this.instance.put("type","counter"); - this.instance.put("count",value.getCount()); - return this; - } - - public MetricEvent build(){ - this.instance.put("name",name); - if(!this.instance.containsKey("timestamp")){ + public MetricEvent build() { + this.instance.put("name", name); + if (!this.instance.containsKey("timestamp")) { this.instance.put("timestamp", DateTimeUtil.getCurrentTimestamp()); } return this.instance; } - @SuppressWarnings({ "rawtypes", "unchecked" }) + public Builder from(Counter value) { + // this.instance.put("type","counter"); + this.instance.put("count", value.getCount()); + return this; + } + + @SuppressWarnings( {"rawtypes", "unchecked"}) public Builder from(Gauge gauge) { Object value = gauge.getValue(); - if( value instanceof Map){ + if (value instanceof Map) { Map map = (Map) value; this.instance.putAll(map); } else { @@ -74,7 +73,7 @@ public class MetricEvent extends TreeMap{ } public Builder from(Histogram value) { - this.instance.put("count",value.getCount()); + this.instance.put("count", value.getCount()); Snapshot snapshot = value.getSnapshot(); this.instance.put("min", snapshot.getMin()); this.instance.put("max", snapshot.getMax()); @@ -90,21 +89,21 @@ public class MetricEvent extends TreeMap{ } public Builder from(Meter value) { - this.instance.put("value",value.getCount()); - this.instance.put("15MinRate",value.getFifteenMinuteRate()); - this.instance.put("5MinRate",value.getFiveMinuteRate()); - this.instance.put("mean",value.getMeanRate()); - this.instance.put("1MinRate",value.getOneMinuteRate()); + this.instance.put("value", value.getCount()); + this.instance.put("15MinRate", value.getFifteenMinuteRate()); + this.instance.put("5MinRate", value.getFiveMinuteRate()); + this.instance.put("mean", value.getMeanRate()); + this.instance.put("1MinRate", value.getOneMinuteRate()); return this; } public Builder from(Timer value) { -// this.instance.put("type","timer"); - this.instance.put("value",value.getCount()); - this.instance.put("15MinRate",value.getFifteenMinuteRate()); - this.instance.put("5MinRate",value.getFiveMinuteRate()); - this.instance.put("mean",value.getMeanRate()); - this.instance.put("1MinRate",value.getOneMinuteRate()); + // this.instance.put("type","timer"); + this.instance.put("value", value.getCount()); + this.instance.put("15MinRate", value.getFifteenMinuteRate()); + this.instance.put("5MinRate", value.getFiveMinuteRate()); + this.instance.put("mean", value.getMeanRate()); + this.instance.put("1MinRate", value.getOneMinuteRate()); return this; } }