Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9827200BE7 for ; Tue, 6 Dec 2016 03:13:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B7D67160B21; Tue, 6 Dec 2016 02:13:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 457F0160B18 for ; Tue, 6 Dec 2016 03:13:34 +0100 (CET) Received: (qmail 29521 invoked by uid 500); 6 Dec 2016 02:13:33 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 29512 invoked by uid 99); 6 Dec 2016 02:13:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Dec 2016 02:13:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 0D990C0532 for ; Tue, 6 Dec 2016 02:13:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 5vHDcNTX_jNl for ; Tue, 6 Dec 2016 02:13:30 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 128725F3BA for ; Tue, 6 Dec 2016 02:13:28 +0000 (UTC) Received: (qmail 29290 invoked by uid 99); 6 Dec 2016 02:13:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Dec 2016 02:13:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 043F9E076F; Tue, 6 Dec 2016 02:13:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jinhuwu@apache.org To: commits@eagle.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: =?utf-8?q?incubator-eagle_git_commit=3A_=5BEAGLE-810=5D_create_int?= =?utf-8?q?erface_StreamOutputCollector_to_abstract_output=E2=80=A6?= Date: Tue, 6 Dec 2016 02:13:28 +0000 (UTC) archived-at: Tue, 06 Dec 2016 02:13:35 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 0d1dcc408 -> 976edcd86 [EAGLE-810] create interface StreamOutputCollector to abstract output… - create interface StreamOutputCollector to abstract outputcollector https://issues.apache.org/jira/browse/EAGLE-810 Author: r7raul1984 Closes #696 from r7raul1984/EAGLE-810. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/976edcd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/976edcd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/976edcd8 Branch: refs/heads/master Commit: 976edcd868b3b90327efe34ee088db579dfc6e89 Parents: 0d1dcc4 Author: r7raul1984 Authored: Tue Dec 6 10:13:22 2016 +0800 Committer: wujinhu Committed: Tue Dec 6 10:13:22 2016 +0800 ---------------------------------------------------------------------- ...ertBoltOutputCollectorThreadSafeWrapper.java | 6 +- .../impl/AlertBoltOutputCollectorWrapper.java | 6 +- .../engine/router/StreamOutputCollector.java | 33 ++++++++++ .../router/impl/StormOutputCollector.java | 65 ++++++++++++++++++++ .../impl/StreamRouterBoltOutputCollector.java | 41 +++++------- .../eagle/alert/engine/runner/AlertBolt.java | 5 +- .../alert/engine/runner/StreamRouterBolt.java | 3 +- ...oltOutputCollectorThreadSafeWrapperTest.java | 3 +- .../TestStreamRouterBoltOutputCollector.java | 31 ++++++---- 9 files changed, 144 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java index 33d502e..185853d 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java @@ -18,7 +18,7 @@ package org.apache.eagle.alert.engine.evaluator.impl; import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import backtype.storm.task.OutputCollector; +import org.apache.eagle.alert.engine.router.StreamOutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +40,14 @@ import java.util.concurrent.atomic.AtomicLong; * */ public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector { - private final OutputCollector delegate; + private final StreamOutputCollector delegate; private final LinkedBlockingQueue queue; private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class); private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis()); private final AutoAlertFlusher flusher; private static final int MAX_ALERT_DELAY_SECS = 10; - public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector) { + public AlertBoltOutputCollectorThreadSafeWrapper(StreamOutputCollector outputCollector) { this.delegate = outputCollector; this.queue = new LinkedBlockingQueue<>(); this.flusher = new AutoAlertFlusher(this); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index af2b9f8..3053e6e 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -25,22 +25,22 @@ import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.StreamContext; import org.apache.eagle.alert.engine.coordinator.PublishPartition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.router.StreamOutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.task.OutputCollector; public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { private static final Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorWrapper.class); - private final OutputCollector delegate; + private final StreamOutputCollector delegate; private final Object outputLock; private final StreamContext streamContext; private volatile Set publishPartitions; - public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, + public AlertBoltOutputCollectorWrapper(StreamOutputCollector outputCollector, Object outputLock, StreamContext streamContext) { this.delegate = outputCollector; this.outputLock = outputLock; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java new file mode 100644 index 0000000..88ffadb --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/StreamOutputCollector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.router; + +import org.apache.eagle.alert.engine.model.PartitionedEvent; + +import java.util.List; + + +public interface StreamOutputCollector { + void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception; + + void emit(List tuple); + + void ack(PartitionedEvent partitionedEvent); + + void fail(PartitionedEvent partitionedEvent); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java new file mode 100644 index 0000000..7b8f344 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StormOutputCollector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.router.impl; + +import backtype.storm.task.OutputCollector; +import org.apache.eagle.alert.engine.model.PartitionedEvent; +import org.apache.eagle.alert.engine.router.StreamOutputCollector; +import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; + +import java.util.Collections; +import java.util.List; + +public class StormOutputCollector implements StreamOutputCollector { + + private final OutputCollector outputCollector; + private final PartitionedEventSerializer serializer; + + public StormOutputCollector(OutputCollector outputCollector, PartitionedEventSerializer serializer) { + this.outputCollector = outputCollector; + this.serializer = serializer; + } + + public StormOutputCollector(OutputCollector outputCollector) { + this(outputCollector, null); + } + + @Override + public void emit(String streamId, PartitionedEvent partitionedEvent) throws Exception { + if (this.serializer == null) { + outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(partitionedEvent)); + } else { + outputCollector.emit(streamId, Collections.singletonList(partitionedEvent.getAnchor()), Collections.singletonList(serializer.serialize(partitionedEvent))); + } + } + + @Override + public void emit(List tuple) { + outputCollector.emit(tuple); + } + + @Override + public void ack(PartitionedEvent partitionedEvent) { + outputCollector.ack(partitionedEvent.getAnchor()); + } + + @Override + public void fail(PartitionedEvent partitionedEvent) { + this.outputCollector.fail(partitionedEvent.getAnchor()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java index 77e8daa..2eb101a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java @@ -18,6 +18,7 @@ */ package org.apache.eagle.alert.engine.router.impl; +import com.google.common.collect.Lists; import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; import org.apache.eagle.alert.coordination.model.StreamRouterSpec; import org.apache.eagle.alert.coordination.model.WorkSlot; @@ -27,14 +28,8 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.router.StreamRoute; -import org.apache.eagle.alert.engine.router.StreamRoutePartitionFactory; -import org.apache.eagle.alert.engine.router.StreamRoutePartitioner; -import org.apache.eagle.alert.engine.router.StreamRouteSpecListener; -import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; +import org.apache.eagle.alert.engine.router.*; import org.apache.eagle.alert.utils.StreamIdConversion; -import backtype.storm.task.OutputCollector; -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,23 +42,21 @@ import java.util.*; */ public class StreamRouterBoltOutputCollector implements PartitionedEventCollector, StreamRouteSpecListener { private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class); - private final OutputCollector outputCollector; + private final StreamOutputCollector outputCollector; private final Object outputLock = new Object(); // private final List outputStreamIds; private final StreamContext streamContext; - private final PartitionedEventSerializer serializer; private volatile Map> routeSpecMap; private volatile Map> routePartitionerMap; private final String sourceId; - public StreamRouterBoltOutputCollector(String sourceId, OutputCollector outputCollector, List outputStreamIds, StreamContext streamContext, PartitionedEventSerializer serializer) { + public StreamRouterBoltOutputCollector(String sourceId, StreamOutputCollector outputCollector, List outputStreamIds, StreamContext streamContext) { this.sourceId = sourceId; this.outputCollector = outputCollector; this.routeSpecMap = new HashMap<>(); this.routePartitionerMap = new HashMap<>(); // this.outputStreamIds = outputStreamIds; this.streamContext = streamContext; - this.serializer = serializer; } public void emit(PartitionedEvent event) { @@ -84,7 +77,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto LOG.error("Partitioner for " + routerSpecs.get(0) + " is null"); synchronized (outputLock) { this.streamContext.counter().incr("fail_count"); - this.outputCollector.fail(event.getAnchor()); + this.outputCollector.fail(event); } return; } @@ -106,11 +99,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto if (LOG.isDebugEnabled()) { LOG.debug("Emitted to stream {} with message {}", targetStreamId, emittedEvent); } - if (this.serializer == null) { - outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(emittedEvent)); - } else { - outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent))); - } + outputCollector.emit(targetStreamId, event); this.streamContext.counter().incr("emit_count"); } catch (RuntimeException ex) { this.streamContext.counter().incr("fail_count"); @@ -119,13 +108,13 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto } } } - outputCollector.ack(event.getAnchor()); + outputCollector.ack(event); } } catch (Exception ex) { LOG.error(ex.getMessage(), ex); synchronized (outputLock) { this.streamContext.counter().incr("fail_count"); - this.outputCollector.fail(event.getAnchor()); + this.outputCollector.fail(event); } } } @@ -141,7 +130,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto // added StreamRouterSpec i.e. there is a new StreamPartition for (StreamRouterSpec spec : added) { if (copyRouteSpecMap.containsKey(spec.getPartition()) - && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { + && copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { LOG.error("Metadata calculation error: add existing StreamRouterSpec " + spec); } else { inplaceAdd(copyRouteSpecMap, copyRoutePartitionerMap, spec, sds); @@ -151,7 +140,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto // removed StreamRouterSpec i.e. there is a deleted StreamPartition for (StreamRouterSpec spec : removed) { if (!copyRouteSpecMap.containsKey(spec.getPartition()) - || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { + || !copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { LOG.error("Metadata calculation error: remove non-existing StreamRouterSpec " + spec); } else { inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec); @@ -161,7 +150,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto // modified StreamRouterSpec, i.e. there is modified StreamPartition, for example WorkSlotQueue assignment is changed for (StreamRouterSpec spec : modified) { if (!copyRouteSpecMap.containsKey(spec.getPartition()) - || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { + || copyRouteSpecMap.get(spec.getPartition()).contains(spec)) { LOG.error("Metadata calculation error: modify nonexisting StreamRouterSpec " + spec); } else { inplaceRemove(copyRouteSpecMap, copyRoutePartitionerMap, spec); @@ -207,9 +196,9 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto } for (PolicyWorkerQueue pwq : streamRouterSpec.getTargetQueue()) { routePartitioners.add(StreamRoutePartitionFactory.createRoutePartitioner( - Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId), - sds.get(streamRouterSpec.getPartition().getStreamId()), - streamRouterSpec.getPartition())); + Lists.transform(pwq.getWorkers(), WorkSlot::getBoltId), + sds.get(streamRouterSpec.getPartition().getStreamId()), + streamRouterSpec.getPartition())); } return routePartitioners; } @@ -219,7 +208,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto synchronized (outputLock) { this.streamContext.counter().incr("drop_count"); if (event.getAnchor() != null) { - this.outputCollector.ack(event.getAnchor()); + this.outputCollector.ack(event); } else { throw new IllegalStateException(event.toString() + " was not acked as anchor is null"); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index c946fee..02bc47e 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -38,6 +38,7 @@ import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrap import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.router.AlertBoltSpecListener; +import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; import org.apache.eagle.alert.engine.utils.SingletonExecutor; import org.apache.eagle.alert.service.IMetadataServiceClient; @@ -102,7 +103,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen pe.getEvent().setMetaVersion(specVersion); } else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) { if (specVersion != null && streamEventVersion != null - && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) { + && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) { // check if specVersion is older than stream_event_version // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]); // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]); @@ -161,7 +162,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen // instantiate output lock object outputLock = new Object(); streamContext = new StreamContextImpl(config, context.registerMetric("eagle.evaluator", new MultiCountMetric(), 60), context); - alertOutputCollector = new AlertBoltOutputCollectorWrapper(collector, outputLock, streamContext); + alertOutputCollector = new AlertBoltOutputCollectorWrapper(new StormOutputCollector(collector), outputLock, streamContext); policyGroupEvaluator.init(streamContext, alertOutputCollector); metadataChangeNotifyService.registerListener(this); metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java index 7acd7e4..29ee771 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java @@ -36,6 +36,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; import org.apache.eagle.alert.engine.router.StreamRouter; import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener; +import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector; import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl; import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; @@ -69,7 +70,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter @Override public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) { streamContext = new StreamContextImpl(config, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context); - routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), collector, this.getOutputStreamIds(), streamContext, serializer); + routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), new StormOutputCollector(collector, serializer), this.getOutputStreamIds(), streamContext); router.prepare(streamContext, routeCollector); changeNotifyService.registerListener(this); changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java index f356931..4552417 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/AlertBoltOutputCollectorThreadSafeWrapperTest.java @@ -20,6 +20,7 @@ import backtype.storm.task.IOutputCollector; import backtype.storm.task.OutputCollector; import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorThreadSafeWrapper; import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; import org.junit.Assert; import org.junit.Test; @@ -32,7 +33,7 @@ public class AlertBoltOutputCollectorThreadSafeWrapperTest { @Test public void testThreadSafeAlertBoltOutputCollector() { MockedStormAlertOutputCollector stormOutputCollector = new MockedStormAlertOutputCollector(null); - AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(stormOutputCollector); + AlertBoltOutputCollectorThreadSafeWrapper alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorThreadSafeWrapper(new StormOutputCollector(stormOutputCollector)); alertBoltOutputCollectorWrapper.emit(create("mockAlert_1")); alertBoltOutputCollectorWrapper.emit(create("mockAlert_2")); Assert.assertEquals(0, stormOutputCollector.getCollected().size()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/976edcd8/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java index fd8ad61..704857d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java @@ -34,6 +34,7 @@ import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.model.StreamEvent; +import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector; import org.junit.Assert; import org.junit.Test; @@ -53,7 +54,7 @@ public class TestStreamRouterBoltOutputCollector { StreamPartition partition = new StreamPartition(); partition.setStreamId(streamId); partition.setType(StreamPartition.Type.GROUPBY); - partition.setColumns(new ArrayList(){{ + partition.setColumns(new ArrayList() {{ add("col1"); }}); @@ -63,22 +64,26 @@ public class TestStreamRouterBoltOutputCollector { PolicyWorkerQueue queue1 = new PolicyWorkerQueue(); queue1.setPartition(partition); - queue1.setWorkers(new ArrayList(){ { - add(worker1); - }} ); + queue1.setWorkers(new ArrayList() { + { + add(worker1); + } + }); PolicyWorkerQueue queue2 = new PolicyWorkerQueue(); queue2.setPartition(partition); - queue2.setWorkers(new ArrayList(){ { - add(worker1); - add(worker2); - }} ); + queue2.setWorkers(new ArrayList() { + { + add(worker1); + add(worker2); + } + }); StreamRouterSpec spec1 = new StreamRouterSpec(); spec1.setStreamId(streamId); spec1.setPartition(partition); - spec1.setTargetQueue(new ArrayList(){{ + spec1.setTargetQueue(new ArrayList() {{ add(queue1); }}); @@ -86,7 +91,7 @@ public class TestStreamRouterBoltOutputCollector { spec2.setStreamId(streamId); spec2.setPartition(partition); - spec2.setTargetQueue(new ArrayList(){{ + spec2.setTargetQueue(new ArrayList() {{ add(queue2); }}); @@ -138,7 +143,7 @@ public class TestStreamRouterBoltOutputCollector { // create two events StreamEvent event1 = new StreamEvent(); - Object[] data = new Object[] {"value1"}; + Object[] data = new Object[]{"value1"}; event1.setData(data); event1.setStreamId(streamId); PartitionedEvent pEvent1 = new PartitionedEvent(); @@ -146,7 +151,7 @@ public class TestStreamRouterBoltOutputCollector { pEvent1.setPartition(partition); StreamEvent event2 = new StreamEvent(); - Object[] data2 = new Object[] {"value3"}; + Object[] data2 = new Object[]{"value3"}; event2.setData(data2); event2.setStreamId(streamId); PartitionedEvent pEvent2 = new PartitionedEvent(); @@ -156,7 +161,7 @@ public class TestStreamRouterBoltOutputCollector { TopologyContext context = Mockito.mock(TopologyContext.class); when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric()); StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context); - StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new OutputCollector(delegate), null, streamContext, null); + StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new StormOutputCollector(new OutputCollector(delegate), null), null, streamContext); // add a StreamRouterSpec which has one worker collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds);