Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2EB37200B13 for ; Wed, 1 Jun 2016 07:56:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2D36C160A41; Wed, 1 Jun 2016 05:56:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B1F32160A47 for ; Wed, 1 Jun 2016 07:56:35 +0200 (CEST) Received: (qmail 77404 invoked by uid 500); 1 Jun 2016 05:56:34 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 77395 invoked by uid 99); 1 Jun 2016 05:56:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jun 2016 05:56:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1FF0518059B for ; Wed, 1 Jun 2016 05:56:34 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id CfgQLQVthsVT for ; Wed, 1 Jun 2016 05:56:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id C993E5FBD2 for ; Wed, 1 Jun 2016 05:56:17 +0000 (UTC) Received: (qmail 76354 invoked by uid 99); 1 Jun 2016 05:56:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jun 2016 05:56:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6B8EEDFE61; Wed, 1 Jun 2016 05:56:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ralphsu@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 01 Jun 2016 05:56:25 -0000 Message-Id: <6efe716c3c7a43b88a1905f0ddaa15d5@git.apache.org> In-Reply-To: <255f27474f434d698401997b0858d9f6@git.apache.org> References: <255f27474f434d698401997b0858d9f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5 archived-at: Wed, 01 Jun 2016 05:56:38 -0000 http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java new file mode 100644 index 0000000..5008dbf --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.router.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.eagle.alert.engine.PartitionedEventCollector; +import org.apache.eagle.alert.engine.StreamContext; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.apache.eagle.alert.engine.model.PartitionedEvent; +import org.apache.eagle.alert.engine.router.StreamRouter; +import org.apache.eagle.alert.engine.router.StreamSortHandler; +import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager; +import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl; +import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamRouterImpl implements StreamRouter { + private static final long serialVersionUID = -4640125063690900014L; + private final static Logger LOG = LoggerFactory.getLogger(StreamRouterImpl.class); + private final String name; + private volatile Map streamSortHandlers; + private PartitionedEventCollector outputCollector; + private StreamTimeClockManager streamTimeClockManager; + private StreamContext context; + + /** + * @param name This name should be formed by topologyId + router id, which is built by topology builder + */ + public StreamRouterImpl(String name){ + this.name = name; + } + + public String getName(){ + return this.name; + } + + @Override + public void close() { + streamSortHandlers.values().forEach(StreamSortHandler::close); + streamTimeClockManager.close(); + } + + public void prepare(StreamContext context, PartitionedEventCollector outputCollector) { + this.streamTimeClockManager = new StreamTimeClockManagerImpl(); + this.streamSortHandlers = new HashMap<>(); + this.outputCollector = outputCollector; + this.context = context; + } + + /** + * TODO: Potential improvement: if StreamSortHandler is expensive, we can use DISRUPTOR to buffer + * + * @param event StreamEvent + */ + public void nextEvent(PartitionedEvent event) { + this.context.counter().scope("receive_count").incr(); + if(!dispatchToSortHandler(event)) { + this.context.counter().scope("direct_count").incr(); + // Pass through directly if no need to sort + outputCollector.emit(event); + } + this.context.counter().scope("sort_count").incr(); + // Update stream clock time if moving forward and trigger all tick listeners + streamTimeClockManager.onTimeUpdate(event.getStreamId(),event.getTimestamp()); + } + + /** + * @param event input event + * @return whether sorted + */ + private boolean dispatchToSortHandler(PartitionedEvent event){ + if(event.getTimestamp() <= 0) return false; + + StreamSortHandler sortHandler = streamSortHandlers.get(event.getPartition()); + if(sortHandler == null){ + if(event.isSortRequired()) { + LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event); + this.context.counter().scope("miss_sort_count").incr(); + } + return false; + } else { + sortHandler.nextEvent(event); + return true; + } + } + + @Override + public void onStreamSortSpecChange(Map added, + Map removed, + Map changed) { + synchronized (streamTimeClockManager) { + Map copy = new HashMap<>(this.streamSortHandlers); + // add new StreamSortSpec + if (added != null && added.size() > 0) { + for (Entry spec : added.entrySet()) { + StreamPartition tmp = spec.getKey(); + if (copy.containsKey(tmp)) { + LOG.error("Metadata calculation error: Duplicated StreamSortSpec " + spec); + } else { + StreamSortHandler handler = new StreamSortWindowHandlerImpl(); + handler.prepare(tmp.getStreamId(),spec.getValue(), this.outputCollector); + copy.put(tmp, handler); + streamTimeClockManager.registerListener(streamTimeClockManager.createStreamTimeClock(tmp.getStreamId()), handler); + } + } + } + + // remove StreamSortSpec + if (removed != null && removed.size() > 0) { + for (Entry spec : removed.entrySet()) { + StreamPartition tmp = spec.getKey(); + if (copy.containsKey(tmp)) { + copy.get(tmp).close(); + streamTimeClockManager.removeListener(copy.get(tmp)); + copy.remove(tmp); + } else { + LOG.error("Metadata calculation error: remove nonexisting StreamSortSpec " + spec.getValue()); + } + } + } + + // modify StreamSortSpec + if (changed != null && changed.size() > 0) { + for (Entry spec : changed.entrySet()) { + StreamPartition tmp = spec.getKey(); + if (copy.containsKey(tmp)) { + copy.get(tmp).close(); + streamTimeClockManager.removeListener(copy.get(tmp)); + copy.remove(tmp); + StreamSortHandler handler = new StreamSortWindowHandlerImpl(); + handler.prepare(tmp.getStreamId(), spec.getValue(), this.outputCollector); + copy.put(tmp, handler); + streamTimeClockManager.registerListener(tmp.getStreamId(), handler); + } else { + LOG.error("Metadata calculation error: modify non-existing StreamSortSpec " + spec.getValue()); + } + } + } + + // atomic switch + this.streamSortHandlers = copy; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java new file mode 100644 index 0000000..cc819ba --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.runner; + +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; +import org.apache.eagle.alert.utils.AlertConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +@SuppressWarnings({"rawtypes", "serial"}) +public abstract class AbstractStreamBolt extends BaseRichBolt { + private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamBolt.class); + private IMetadataChangeNotifyService changeNotifyService; + private Config config; + private List outputStreamIds; + protected OutputCollector collector; + + public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){ + this.changeNotifyService = changeNotifyService; + this.config = config; + } + + public void declareOutputStreams(List outputStreamIds){ + this.outputStreamIds = outputStreamIds; + } + + protected List getOutputStreamIds(){ + return this.outputStreamIds; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet"); + this.collector = collector; + internalPrepare(collector,this.changeNotifyService,this.config,context); + } + + /** + * subclass should implement more initialization for example + * 1) register metadata change + * @param collector + * @param metadataManager + * @param config + * @param context + */ + public abstract void internalPrepare( + OutputCollector collector, + IMetadataChangeNotifyService metadataManager, + Config config, TopologyContext context); + + @Override + public void cleanup() { + super.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + if(this.outputStreamIds!=null){ + LOG.info("declare streams: {} ", outputStreamIds); + for(String streamId:this.outputStreamIds){ + declarer.declareStream(streamId,new Fields(AlertConstants.FIELD_0)); + } + } else { + declarer.declare(new Fields(AlertConstants.FIELD_0)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java new file mode 100755 index 0000000..30ff5f0 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.runner; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.AlertBoltSpec; +import org.apache.eagle.alert.coordination.model.WorkSlot; +import org.apache.eagle.alert.engine.AlertStreamCollector; +import org.apache.eagle.alert.engine.StreamContext; +import org.apache.eagle.alert.engine.StreamContextImpl; +import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; +import org.apache.eagle.alert.engine.coordinator.MetadataType; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator; +import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper; +import org.apache.eagle.alert.engine.model.PartitionedEvent; +import org.apache.eagle.alert.engine.router.AlertBoltSpecListener; +import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; +import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; +import org.apache.eagle.alert.engine.serialization.Serializers; +import org.apache.eagle.alert.utils.AlertConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.metric.api.MultiCountMetric; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; + +import com.typesafe.config.Config; + +/** + * Since 5/1/16. + * This is container for hosting all policies belonging to the same monitoredStream + * MonitoredStream refers to tuple of {dataSource, streamId, grouopby} + * The container is also called {@link WorkSlot} + */ +public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener,SerializationMetadataProvider { + private final static Logger LOG = LoggerFactory.getLogger(AlertBolt.class); + private static final long serialVersionUID = -4132297691448945672L; + private PolicyGroupEvaluator policyGroupEvaluator; + private AlertStreamCollector alertOutputCollector; + private String boltId; + private volatile Object outputLock; + // mapping from policy name to PolicyDefinition + private volatile Map cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies + + private StreamContext streamContext; + private volatile Map sdf; + private PartitionedEventSerializer serializer; + + public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){ + super(changeNotifyService, config); + this.boltId = boltId; + this.policyGroupEvaluator = policyGroupEvaluator; + } + + PartitionedEvent deserialize(Object object) throws IOException { + // byte[] in higher priority + if(object instanceof byte[]) { + return serializer.deserialize((byte[]) object); + } else { + return (PartitionedEvent) object; + } + } + + @Override + public void execute(Tuple input) { + this.streamContext.counter().scope("execute_count").incr(); + try { + policyGroupEvaluator.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input)); + synchronized (outputLock) { + this.collector.ack(input); + } + this.streamContext.counter().scope("ack_count").incr(); + }catch (Exception ex) { + LOG.error(ex.getMessage(),ex); + synchronized (outputLock) { + this.streamContext.counter().scope("fail_count").incr(); + this.collector.fail(input); + } + } finally { + alertOutputCollector.flush(); + } + } + + @Override + public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService metadataChangeNotifyService, Config config, TopologyContext context) { + // instantiate output lock object + outputLock = new Object(); + streamContext = new StreamContextImpl(config,context.registerMetric("eagle.evaluator",new MultiCountMetric(),60),context); + serializer = Serializers.newPartitionedEventSerializer(this); + alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock,streamContext); + policyGroupEvaluator.init(streamContext, alertOutputCollector); + metadataChangeNotifyService.registerListener(this); + metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(AlertConstants.FIELD_0, AlertConstants.FIELD_1)); + } + + @Override + public void cleanup() { + policyGroupEvaluator.close(); + alertOutputCollector.flush(); + alertOutputCollector.close(); + super.cleanup(); + } + + @Override + public void onAlertBoltSpecChange(AlertBoltSpec spec, Map sds) { + List newPolicies = spec.getBoltPoliciesMap().get(boltId); + if(newPolicies == null) { + LOG.info("no policy with AlertBoltSpec {} for this bolt {}", spec, boltId); + return; + } + + Map newPoliciesMap = new HashMap<>(); + newPolicies.forEach(p -> newPoliciesMap.put(p.getName(), p)); + MapComparator comparator = new MapComparator<>(newPoliciesMap, cachedPolicies); + comparator.compare(); + + policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds); + + // switch + cachedPolicies = newPoliciesMap; + sdf = sds; + } + + @Override + public StreamDefinition getStreamDefinition(String streamId) { + return sdf.get(streamId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java new file mode 100644 index 0000000..0a239e2 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.runner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.PublishSpec; +import org.apache.eagle.alert.engine.StreamContextImpl; +import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; +import org.apache.eagle.alert.engine.coordinator.MetadataType; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; +import org.apache.eagle.alert.engine.publisher.AlertPublisher; +import org.apache.eagle.alert.utils.AlertConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.metric.api.MultiCountMetric; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; + +import com.typesafe.config.Config; + +@SuppressWarnings("serial") +public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener { + private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class); + private final AlertPublisher alertPublisher; + private volatile Map cachedPublishments = new HashMap<>(); + private StreamContextImpl streamContext; + + public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){ + super(coordinatorService, config); + this.alertPublisher = alertPublisher; + this.alertPublisher.init(config); + } + + @Override + public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) { + coordinatorService.registerListener(this); + coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT); + streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context); + } + + @Override + public void execute(Tuple input) { + try { + streamContext.counter().scope("receive_count"); + alertPublisher.nextEvent((AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1)); + this.collector.ack(input); + streamContext.counter().scope("ack_count"); + } catch (Exception ex){ + streamContext.counter().scope("fail_count"); + LOG.error(ex.getMessage(),ex); + collector.reportError(ex); + } + } + + @Override + public void cleanup() { + alertPublisher.close(); + super.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields()); + } + + @Override + public void onAlertPublishSpecChange(PublishSpec pubSpec, Map sds) { + if(pubSpec == null) return; + + List newPublishments = pubSpec.getPublishments(); + if(newPublishments == null) { + LOG.info("no publishments with PublishSpec {} for this topology", pubSpec); + return; + } + + Map newPublishmentsMap = new HashMap<>(); + newPublishments.forEach(p -> newPublishmentsMap.put(p.getName(), p)); + MapComparator comparator = new MapComparator<>(newPublishmentsMap, cachedPublishments); + comparator.compare(); + + List beforeModified = new ArrayList<>(); + comparator.getModified().forEach(p -> beforeModified.add(cachedPublishments.get(p.getName()))); + alertPublisher.onPublishChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), beforeModified); + + // switch + cachedPublishments = newPublishmentsMap; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java new file mode 100644 index 0000000..04595b1 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java @@ -0,0 +1,72 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.runner; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; + +/** + * Since 5/2/16. + */ +public class MapComparator { + private Map map1; + private Map map2; + private List added = new ArrayList<>(); + private List removed = new ArrayList<>(); + private List modified = new ArrayList<>(); + public MapComparator(Map map1, Map map2){ + this.map1 = map1; + this.map2 = map2; + } + + @SuppressWarnings("unchecked") + public void compare(){ + Set keys1 = map1.keySet(); + Set keys2 = map2.keySet(); + Collection addedKeys = CollectionUtils.subtract(keys1, keys2); + Collection removedKeys = CollectionUtils.subtract(keys2, keys1); + Collection modifiedKeys = CollectionUtils.intersection(keys1, keys2); + + addedKeys.forEach(k -> added.add(map1.get(k))); + removedKeys.forEach(k -> removed.add(map2.get(k))); + modifiedKeys.forEach(k -> { + if(!map1.get(k).equals(map2.get(k))){ + modified.add(map1.get(k)); + } + }); + } + + public List getAdded(){ + return added; + } + + public List getRemoved(){ + return removed; + } + + public List getModified(){ + return modified; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java new file mode 100644 index 0000000..1a6267b --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.runner; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.metric.IMetricSystem; +import org.apache.eagle.alert.metric.MetricSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.metric.api.IMetricsConsumer; +import backtype.storm.task.IErrorReporter; +import backtype.storm.task.TopologyContext; + +import com.codahale.metrics.Gauge; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +/** + * Share same metric system + */ +public class StormMetricConsumer implements IMetricsConsumer { + public static final Logger LOG = LoggerFactory.getLogger(StormMetricConsumer.class); + private String topologyName; + private IMetricSystem metricSystem; + private String topologyId; + + @SuppressWarnings({ "serial", "rawtypes" }) + @Override + public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { + Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults()); + topologyName = config.getString("topology.name"); + topologyId = context.getStormId(); + metricSystem = MetricSystem.load(config); + metricSystem.tags(new HashMap(){{ + put("topologyName",topologyName); + put("topologyId",topologyId); + }}); + metricSystem.start(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { + synchronized (metricSystem) { + List metricList = new LinkedList<>(); + for(DataPoint dataPoint:dataPoints){ + if(dataPoint.value instanceof Map) { + Map values = (Map) dataPoint.value; + for(Map.Entry entry:values.entrySet()){ + String metricName = buildMetricName(taskInfo, dataPoint.name, entry.getKey()); + metricList.add(metricName); + Gauge gauge = metricSystem.registry().getGauges().get(metricName); + if(gauge == null) { + LOG.info("Register metric {}", metricName); + gauge = new DataPointGauge(entry.getValue()); + metricSystem.registry().register(metricName,gauge); + }else{ + ((DataPointGauge) gauge).setValue(entry.getValue()); + } + } + } else { + String metricName = buildMetricName(taskInfo, dataPoint.name); + metricList.add(metricName); + LOG.info("Register metric {}",metricName); + Gauge gauge = metricSystem.registry().getGauges().get(metricName); + if(gauge == null) { + LOG.info("Register metric {}", metricName); + gauge = new DataPointGauge(dataPoint.value); + metricSystem.registry().register(metricName,gauge); + } else { + ((DataPointGauge) gauge).setValue(dataPoint.value); + } + } + } + metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0); + metricSystem.report(); + metricSystem.registry().getGauges().values().forEach((gauge -> { + if(gauge instanceof DataPointGauge){ + ((DataPointGauge)gauge).reset(); + } + })); + LOG.info("Reported {} metric data points from {} [{}]",dataPoints.size(),taskInfo.srcComponentId,taskInfo.srcTaskId); + } + } + + private class DataPointGauge implements Gauge { + private Object value; + public DataPointGauge(Object initialValue){ + this.value = initialValue; + } + + @Override + public Object getValue() { + return value; + } + + public void setValue(Object value){ + this.value = value; + } + + public void reset(){ + this.value = 0; + } + } + + private String buildMetricName(TaskInfo taskInfo,String ... name ){ + return String.join(".",StringUtils.join(name,".").replace("/","."),taskInfo.srcComponentId,taskInfo.srcTaskId+""); + } + + @Override + public void cleanup() { + metricSystem.stop(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java new file mode 100644 index 0000000..c18a44f --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.runner; + +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.metric.IMetricSystem; +import org.apache.eagle.alert.metric.MetricSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.metric.api.IMetricsConsumer; +import backtype.storm.task.IErrorReporter; +import backtype.storm.task.TopologyContext; + +import com.codahale.metrics.Gauge; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +/** + * Per MetricSystem instance per task + */ +public class StormMetricTaggedConsumer implements IMetricsConsumer { + public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class); + private String topologyName; + private Map metricSystems; + private String stormId; + private Config config; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { + this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults()); + topologyName = config.getString("topology.name"); + stormId = context.getStormId(); + metricSystems = new HashMap<>(); + } + + @SuppressWarnings("serial") + @Override + public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { + synchronized (metricSystems) { + String uniqueTaskKey = buildUniqueTaskKey(taskInfo); + MetricSystem metricSystem = metricSystems.get(uniqueTaskKey); + if(metricSystem == null){ + metricSystem = MetricSystem.load(config); + metricSystems.put(uniqueTaskKey,metricSystem); + metricSystem.tags(new HashMap(){{ + put("topology",topologyName); + put("stormId",stormId); + put("component",taskInfo.srcComponentId); + put("task",taskInfo.srcTaskId); + }}); + metricSystem.start(); + LOG.info("Initialized metric reporter for {}",uniqueTaskKey); + } + report(metricSystem,taskInfo,dataPoints); + if(LOG.isDebugEnabled()) { + LOG.debug("Reported {} metric points from {}", dataPoints.size(), uniqueTaskKey); + } + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void report(MetricSystem metricSystem,TaskInfo taskInfo,Collection dataPoints){ + List metricList = new LinkedList<>(); + for (DataPoint dataPoint : dataPoints) { + if (dataPoint.value instanceof Map) { + Map values = (Map) dataPoint.value; + for (Map.Entry entry : values.entrySet()) { + String metricName = buildSimpleMetricName(taskInfo, dataPoint.name, entry.getKey()); + metricList.add(metricName); + Gauge gauge = metricSystem.registry().getGauges().get(metricName); + if (gauge == null) { + gauge = new DataPointGauge(entry.getValue()); + metricSystem.registry().register(metricName, gauge); + LOG.debug("Register metric {}", metricName); + } else { + ((DataPointGauge) gauge).setValue(entry.getValue()); + } + } + } else { + String metricName = buildSimpleMetricName(taskInfo, dataPoint.name); + metricList.add(metricName); + Gauge gauge = metricSystem.registry().getGauges().get(metricName); + if (gauge == null) { + gauge = new DataPointGauge(dataPoint.value); + metricSystem.registry().register(metricName, gauge); + LOG.debug("Register metric {}", metricName); + } else { + ((DataPointGauge) gauge).setValue(dataPoint.value); + } + } + } + metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0); + metricSystem.report(); + metricSystem.registry().getGauges().values().forEach((gauge -> { + if(gauge instanceof DataPointGauge){ + ((DataPointGauge)gauge).reset(); + } + })); + } + + private static class DataPointGauge implements Gauge { + private Object value; + public DataPointGauge(Object initialValue){ + this.setValue(initialValue); + } + + @Override + public Object getValue() { + return value; + } + + public void setValue(Object value){ + this.value = value; + } + + public void reset(){ + this.value = 0; + } + } + + private static String buildUniqueTaskKey(TaskInfo taskInfo){ + return String.format("%s[%s]",taskInfo.srcComponentId,taskInfo.srcTaskId); + } + + private static String buildSimpleMetricName(TaskInfo taskInfo,String ... name ){ + return String.join(".",StringUtils.join(name,".").replace("/",".")); + } + + @Override + public void cleanup() { + metricSystems.values().forEach(IMetricSystem::stop); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java new file mode 100644 index 0000000..942ef97 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.runner; + +import backtype.storm.metric.api.MultiCountMetric; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import com.typesafe.config.Config; +import org.apache.commons.collections.CollectionUtils; +import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; +import org.apache.eagle.alert.coordination.model.RouterSpec; +import org.apache.eagle.alert.coordination.model.StreamRouterSpec; +import org.apache.eagle.alert.engine.StreamContext; +import org.apache.eagle.alert.engine.StreamContextImpl; +import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.model.PartitionedEvent; +import org.apache.eagle.alert.engine.router.StreamRouter; +import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener; +import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector; +import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; +import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; +import org.apache.eagle.alert.engine.serialization.Serializers; +import org.apache.eagle.alert.utils.AlertConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{ + private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class); + private static final long serialVersionUID = -7611470889316430372L; + private StreamRouter router; + private StreamRouterBoltOutputCollector routeCollector; + // mapping from StreamPartition to StreamSortSpec + private volatile Map cachedSSS = new HashMap<>(); + // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec + private volatile Map cachedSRS = new HashMap<>(); + private volatile Map sdf = new HashMap<>(); + private PartitionedEventSerializer serializer; + + public StreamRouterBolt(StreamRouter router, Config config, IMetadataChangeNotifyService changeNotifyService) { + super(changeNotifyService, config); + this.router = router; + } + + private StreamContext streamContext; + + @Override + public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) { + streamContext = new StreamContextImpl(config,context.registerMetric("eagle.router",new MultiCountMetric(),60),context); + serializer= Serializers.newPartitionedEventSerializer(this); + routeCollector = new StreamRouterBoltOutputCollector(this.router.getName(),collector,this.getOutputStreamIds(),streamContext,serializer); + router.prepare(streamContext, routeCollector); + changeNotifyService.registerListener(this); + changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT); + } + + PartitionedEvent deserialize(Object object) throws IOException { + // byte[] in higher priority + if(object instanceof byte[]) { + return serializer.deserialize((byte[]) object); + } else { + return (PartitionedEvent) object; + } + } + + @Override + public void execute(Tuple input) { + try { + this.streamContext.counter().scope("execute_count").incr(); + this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input)); + } catch (Exception ex) { + this.streamContext.counter().scope("fail_count").incr(); + LOG.error(ex.getMessage(),ex); + this.collector.fail(input); + } + } + + @Override + public void cleanup() { + this.router.close(); + super.cleanup(); + } + + /** + * Compare with metadata snapshot cache to generate diff like added, removed and modified between different versions. + * @param spec + */ + @SuppressWarnings("unchecked") + @Override + public void onStreamRouteBoltSpecChange(RouterSpec spec, Map sds) { + sanityCheck(spec); + + // figure out added, removed, modified StreamSortSpec + Map newSSS = new HashMap<>(); + spec.getRouterSpecs().forEach(t -> { + if (t.getPartition().getSortSpec() != null) { + newSSS.put(t.getPartition(), t.getPartition().getSortSpec()); + } + }); + + Set newStreamIds = newSSS.keySet(); + Set cachedStreamIds = cachedSSS.keySet(); + Collection addedStreamIds = CollectionUtils.subtract(newStreamIds, cachedStreamIds); + Collection removedStreamIds = CollectionUtils.subtract(cachedStreamIds, newStreamIds); + Collection modifiedStreamIds = CollectionUtils.intersection(newStreamIds, cachedStreamIds); + + Map added = new HashMap<>(); + Map removed = new HashMap<>(); + Map modified = new HashMap<>(); + addedStreamIds.forEach(s -> added.put(s, newSSS.get(s))); + removedStreamIds.forEach(s -> removed.put(s, cachedSSS.get(s))); + modifiedStreamIds.forEach(s -> { + if(!newSSS.get(s).equals(cachedSSS.get(s))){ // this means StreamSortSpec is changed for one specific streamId + modified.put(s, newSSS.get(s)); + } + }); + if(LOG.isDebugEnabled()) { + LOG.debug("added StreamSortSpec " + added); + LOG.debug("removed StreamSortSpec " + removed); + LOG.debug("modified StreamSortSpec " + modified); + } + router.onStreamSortSpecChange(added, removed, modified); + // switch cache + cachedSSS = newSSS; + + // figure out added, removed, modified StreamRouterSpec + Map newSRS = new HashMap<>(); + spec.getRouterSpecs().forEach(t -> newSRS.put(t.getPartition(), t)); + + Set newStreamPartitions = newSRS.keySet(); + Set cachedStreamPartitions = cachedSRS.keySet(); + + Collection addedStreamPartitions = CollectionUtils.subtract(newStreamPartitions, cachedStreamPartitions); + Collection removedStreamPartitions = CollectionUtils.subtract(cachedStreamPartitions, newStreamPartitions); + Collection modifiedStreamPartitions = CollectionUtils.intersection(newStreamPartitions, cachedStreamPartitions); + + Collection addedRouterSpecs = new ArrayList<>(); + Collection removedRouterSpecs = new ArrayList<>(); + Collection modifiedRouterSpecs = new ArrayList<>(); + addedStreamPartitions.forEach(s -> addedRouterSpecs.add(newSRS.get(s))); + removedStreamPartitions.forEach(s -> removedRouterSpecs.add(cachedSRS.get(s))); + modifiedStreamPartitions.forEach(s -> { + if(!newSRS.get(s).equals(cachedSRS.get(s))){ // this means StreamRouterSpec is changed for one specific StreamPartition + modifiedRouterSpecs.add(newSRS.get(s)); + } + }); + + if(LOG.isDebugEnabled()) { + LOG.debug("added StreamRouterSpec " + addedRouterSpecs); + LOG.debug("removed StreamRouterSpec " + removedRouterSpecs); + LOG.debug("modified StreamRouterSpec " + modifiedRouterSpecs); + } + + routeCollector.onStreamRouterSpecChange(addedRouterSpecs, removedRouterSpecs, modifiedRouterSpecs, sds); + // switch cache + cachedSRS = newSRS; + sdf = sds; + } + + /** + * in correlation cases, multiple streams will go to the same queue for correlation policy + * @param spec + */ + private void sanityCheck(RouterSpec spec){ + Set totalRequestedSlots = new HashSet<>(); + for(StreamRouterSpec s : spec.getRouterSpecs()){ + for(PolicyWorkerQueue q : s.getTargetQueue()){ + List workers = new ArrayList<>(); + q.getWorkers().forEach(w -> workers.add(w.getBoltId())); + totalRequestedSlots.addAll(workers); + } + } + if(totalRequestedSlots.size() > getOutputStreamIds().size()){ + String error = String.format("Requested slots are not consistent with provided slots, %s, %s", totalRequestedSlots, getOutputStreamIds()); + LOG.error(error); + throw new IllegalStateException(error); + } + } + + @Override + public StreamDefinition getStreamDefinition(String streamId) { + return this.sdf.get(streamId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java new file mode 100755 index 0000000..cef94c7 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -0,0 +1,215 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.runner; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; +import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; +import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; +import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl; +import org.apache.eagle.alert.engine.spout.CorrelationSpout; +import org.apache.eagle.alert.utils.AlertConstants; +import org.apache.eagle.alert.utils.StreamIdConversion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; + +/** + * By default + * 1. one spout with multiple tasks + * 2. multiple router bolts with each bolt having exactly one task + * 3. multiple alert bolts with each bolt having exactly one task + * 4. one publish bolt with multiple tasks + */ +public class UnitTopologyRunner { + private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class); + public final static String spoutName = "alertEngineSpout"; + private final static String streamRouterBoltNamePrefix = "streamRouterBolt"; + private final static String alertBoltNamePrefix = "alertBolt"; + public final static String alertPublishBoltName = "alertPublishBolt"; + + public final static String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers"; + public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + public final static String ROUTER_TASK_NUM = "topology.numOfRouterBolts"; + public final static String ALERT_TASK_NUM = "topology.numOfAlertBolts"; + public final static String PUBLISH_TASK_NUM = "topology.numOfPublishTasks"; + public final static String LOCAL_MODE = "topology.localMode"; + public final static String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs"; + public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600; + + private final IMetadataChangeNotifyService metadataChangeNotifyService; + + public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){ + this.metadataChangeNotifyService = metadataChangeNotifyService; + } + + public StormTopology buildTopology(String topologyId, + int numOfSpoutTasks, + int numOfRouterBolts, + int numOfAlertBolts, + int numOfPublishTasks, + Config config) { + + StreamRouterImpl[] routers = new StreamRouterImpl[numOfRouterBolts]; + StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts]; + PolicyGroupEvaluatorImpl[] evaluators = new PolicyGroupEvaluatorImpl[numOfAlertBolts]; + AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts]; + AlertPublisherImpl publisher; + AlertPublisherBolt publisherBolt; + + TopologyBuilder builder = new TopologyBuilder(); + + + // construct Spout object + CorrelationSpout spout = new CorrelationSpout(config, topologyId, getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, streamRouterBoltNamePrefix); + builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); + + // construct StreamRouterBolt objects + for(int i=0; i outputStreamIds = new ArrayList<>(numOfAlertBolts); + for(int j=0; j deserialize(byte[] ser) { + try { + if(ser != null ) { + Map map = mapper.readValue(ser, Map.class); + return Arrays.asList(topic, map); + }else{ + if(LOG.isDebugEnabled()) LOG.debug("Content is null, ignore"); + } + } catch (IOException e) { + try { + LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e); + }catch(Exception ex){ + LOG.error(ex.getMessage(), ex); + } + } + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java new file mode 100644 index 0000000..1182e3f --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java @@ -0,0 +1,74 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.scheme; + +import java.util.Map; +import java.util.Properties; + +import org.apache.eagle.alert.coordination.model.StreamNameSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A strategy to get stream name from message tuple. + * + * Since 5/5/16. + */ +public class JsonStringStreamNameSelector implements StreamNameSelector { + private final static Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class); + private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName"; + private final static String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName"; + private final static String STREAM_NAME_FORMAT = "streamNameFormat"; + + private String userProvidedStreamName; + private String[] fieldNamesToInferStreamName; + private String streamNameFormat; + + public JsonStringStreamNameSelector(Properties prop) { + userProvidedStreamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY); + String fields = prop.getProperty(FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY); + if (fields != null) { + fieldNamesToInferStreamName = fields.split(","); + } + streamNameFormat = prop.getProperty(STREAM_NAME_FORMAT); + if (streamNameFormat == null) { + LOG.warn("no stream name format found, this might cause default stream name be used which is dis-encouraged. Possibly this is a mis-configuration."); + } + } + + @Override + public String getStreamName(Map tuple) { + if (userProvidedStreamName != null) { + return userProvidedStreamName; + } else if (fieldNamesToInferStreamName != null && streamNameFormat != null) { + Object[] args = new Object[fieldNamesToInferStreamName.length]; + for (int i = 0; i < fieldNamesToInferStreamName.length; i++) { + Object colValue = tuple.get(fieldNamesToInferStreamName[i]); + args[i] = colValue; + } + return String.format(streamNameFormat, args); + } + if (LOG.isDebugEnabled()) { + LOG.debug("can not find the stream name for data source. Use the default stream, possibly this means mis-configuration of datasource!"); + } + return "defaultStringStream"; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java new file mode 100644 index 0000000..89d2e76 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java @@ -0,0 +1,67 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.scheme; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.spout.Scheme; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +/** + * used for parsing plain string + */ +public class PlainStringScheme implements Scheme { + private static final long serialVersionUID = 5969724968671646310L; + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class); + private String topic; + + public PlainStringScheme(String topic){ + this.topic = topic; + } + + private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; + public static final String STRING_SCHEME_KEY = "str"; + + public static String deserializeString(byte[] buff) { + return new String(buff, UTF8_CHARSET); + } + + public Fields getOutputFields() { + return new Fields(STRING_SCHEME_KEY); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List deserialize(byte[] ser) { + Map m = new HashMap<>(); + m.put("value", deserializeString(ser)); + m.put("timestamp", System.currentTimeMillis()); + return new Values(topic, m); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java new file mode 100644 index 0000000..61ec943 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java @@ -0,0 +1,49 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.scheme; + +import java.util.Map; +import java.util.Properties; + +import org.apache.eagle.alert.coordination.model.StreamNameSelector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Since 5/3/16. + */ +public class PlainStringStreamNameSelector implements StreamNameSelector { + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class); + private final static String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName"; + private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream"; + + private String streamName; + + public PlainStringStreamNameSelector(Properties prop){ + streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY); + if(streamName == null) + streamName = DEFAULT_STRING_STREAM_NAME; + } + @Override + public String getStreamName(Map tuple) { + return streamName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java new file mode 100644 index 0000000..5ba1080 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.serialization; + +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.model.PartitionedEvent; +import org.apache.eagle.alert.engine.model.StreamEvent; +import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer; +import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * TODO: Seams the complexity dosen't bring enough performance improve + * + * @see PartitionedEvent + */ +@Deprecated +public class PartitionedEventDigestSerializer implements Serializer { + private final Serializer streamEventSerializer; + private final Serializer streamPartitionSerializer; + + public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider){ + this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider); + this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE; + } + + @Override + public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException { + dataOutput.writeLong(entity.getPartitionKey()); + streamEventSerializer.serialize(entity.getEvent(),dataOutput); + streamPartitionSerializer.serialize(entity.getPartition(),dataOutput); + } + + @Override + public PartitionedEvent deserialize(DataInput dataInput) throws IOException { + PartitionedEvent event = new PartitionedEvent(); + event.setPartitionKey(dataInput.readLong()); + StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput); + event.setEvent(streamEvent); + StreamPartition partition = streamPartitionSerializer.deserialize(dataInput); + partition.setStreamId(streamEvent.getStreamId()); + event.setPartition(partition); + return event; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java new file mode 100644 index 0000000..c518e40 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.serialization; + +import org.apache.eagle.alert.engine.model.PartitionedEvent; + +import java.io.IOException; + +public interface PartitionedEventSerializer { + /** + * + * @param entity + * @return + * @throws IOException + */ + byte[] serialize(PartitionedEvent entity) throws IOException; + + /** + * + * @param bytes + * @return + * @throws IOException + */ + PartitionedEvent deserialize(byte[] bytes) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java new file mode 100644 index 0000000..71b274d --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.serialization; + +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +/** + * Integration interface to provide stream definition for serializer + */ +public interface SerializationMetadataProvider { + /** + * @param streamId + * @return StreamDefinition or null if not exist + */ + StreamDefinition getStreamDefinition(String streamId); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java new file mode 100644 index 0000000..c2f87d0 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.serialization; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public interface Serializer { + void serialize(V value,DataOutput dataOutput) throws IOException; + V deserialize(DataInput dataInput) throws IOException; +} \ No newline at end of file