Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 99615200B99 for ; Wed, 5 Oct 2016 22:50:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 965BB160ADE; Wed, 5 Oct 2016 20:50:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B5E70160AC9 for ; Wed, 5 Oct 2016 22:50:19 +0200 (CEST) Received: (qmail 13568 invoked by uid 500); 5 Oct 2016 20:50:19 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 13559 invoked by uid 99); 5 Oct 2016 20:50:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Oct 2016 20:50:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 8B3831A5443 for ; Wed, 5 Oct 2016 20:50:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ichUy9556taX for ; Wed, 5 Oct 2016 20:50:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id B440B5F2C4 for ; Wed, 5 Oct 2016 20:50:15 +0000 (UTC) Received: (qmail 13522 invoked by uid 99); 5 Oct 2016 20:50:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Oct 2016 20:50:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C972BDFDEA; Wed, 5 Oct 2016 20:50:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ralphsu@apache.org To: commits@eagle.incubator.apache.org Message-Id: <4bffef19ad59484097177f9ee9c843e1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-590]: AlertEngine: the kafka_spout might be dropped by metadata update when system ack a tupe Date: Wed, 5 Oct 2016 20:50:14 +0000 (UTC) archived-at: Wed, 05 Oct 2016 20:50:20 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master f12c82f8a -> f467954ac [EAGLE-590]: AlertEngine: the kafka_spout might be dropped by metadata update when system ack a tupe Author: ralphsu This closes #475 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f467954a Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f467954a Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f467954a Branch: refs/heads/master Commit: f467954acc427763d5ab900214018a6be4830f02 Parents: f12c82f Author: Ralph, Su Authored: Thu Oct 6 02:13:56 2016 +0800 Committer: Ralph, Su Committed: Thu Oct 6 04:50:31 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/spout/CorrelationSpout.java | 8 +- .../alert/engine/router/TestAlertBolt.java | 151 +++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f467954a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index d4266a3..67074ce 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -183,7 +183,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener // decode and get topic KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId; KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic); - spout.ack(id.id); + if (spout != null) { + spout.ack(id.id); + } } @Override @@ -192,7 +194,9 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId; LOG.error("Failing message {}, with topic {}", msgId, id.topic); KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic); - spout.fail(id.id); + if (spout != null) { + spout.fail(id.id); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f467954a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java index 4bec98d..2b9144d 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java @@ -435,5 +435,156 @@ public class TestAlertBolt { Assert.assertTrue(recieved.get()); } + @Test + public void testMultiStreamDefinition() throws Exception { + final AtomicInteger alertCount = new AtomicInteger(); + final Semaphore mutex = new Semaphore(0); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + int count = 0; + + @Override + public List emit(String streamId, Collection anchors, List tuple) { + System.out.println("=====output collector=========="); + alertCount.incrementAndGet(); + mutex.release(); + Assert.assertTrue("symptomaticAlertOutputStream".equals((String) tuple.get(0)) + || "deviceDownAlertStream".equals((String) tuple.get(0))); + AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); + System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); + + System.out.println("**********output collector end***********"); + return null; + } + + @Override + public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { + } + + @Override + public void ack(Tuple input) { + } + + @Override + public void fail(Tuple input) { + } + + @Override + public void reportError(Throwable error) { + } + }); + + + AlertBolt bolt = createAlertBolt(collector); + + // construct StreamPartition + StreamPartition sp = new StreamPartition(); + sp.setColumns(Collections.singletonList("col1")); + sp.setStreamId("correlatedStream"); + sp.setType(StreamPartition.Type.GROUPBY); + + pushAlertBoltSpec(sp, bolt); + + // now emit + // contruct GeneralTopologyContext + GeneralTopologyContext context = mock(GeneralTopologyContext.class); + int taskId = 1; + when(context.getComponentId(taskId)).thenReturn("comp1"); + when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0")); + + long base = System.currentTimeMillis(); + int i = 0; + String linkedSwitch = "lvs-ra-01"; + + // construct event with "value1" + StreamEvent event1 = new StreamEvent(); + event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000); + event1.setMetaVersion("version1"); + Object[] data = new Object[] { base , "child-"+ (i++), "", linkedSwitch}; + event1.setData(data); + event1.setStreamId("correlatedStream"); + PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001); + + // construct another event with "value1" + StreamEvent event2 = new StreamEvent(); + event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:05:00") * 1000); + event2.setMetaVersion("version1"); + data = new Object[] { base , "child-"+ (i++), "", linkedSwitch}; + event2.setData(data); + event2.setStreamId("correlatedStream"); + PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001); + + Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default"); + Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default"); + bolt.execute(input); + bolt.execute(input2); + Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(1, 5, TimeUnit.SECONDS)); + Assert.assertEquals(3, alertCount.get()); + bolt.cleanup(); + } + + private void pushAlertBoltSpec(StreamPartition sp, AlertBolt bolt) { + Map sds = new HashMap<>(); + sds.put("correlatedStream", createCorrelateStream("correlatedStream")); + sds.put("symptomaticAlertOutputStream", createCorrelateStream("symptomaticAlertOutputStream")); // output of updated correlatedStream + sds.put("deviceDownAlertStream", createCorrelateStream("deviceDownAlertStream")); + + PolicyDefinition pd = new PolicyDefinition(); + pd.setName("network_symptomatic"); + pd.setInputStreams(Arrays.asList("correlatedStream")); + pd.setOutputStreams(Arrays.asList("deviceDownAlertStream", "symptomaticAlertOutputStream")); + + pd.setPartitionSpec(Arrays.asList(sp)); + + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setType(PolicyStreamHandlers.SIDDHI_ENGINE); + def.setValue("from correlatedStream#window.externalTime(timestamp, 3 min) select UUID() as docId, linkedSwitch, '' as parentKey, timestamp group by linkedSwitch having count() > 0 insert into deviceDownAlertStream; " + + " from correlatedStream#window.externalTime(timestamp, 3 min) as left join deviceDownAlertStream#window.time(3 min) as right on left.linkedSwitch == right.linkedSwitch" + + " select left.docId, left.timestamp, left.linkedSwitch, right.docId as parentKey insert into symptomaticAlertOutputStream;"); + pd.setDefinition(def); + + + AlertBoltSpec spec = new AlertBoltSpec(); + spec.setVersion("version1"); + spec.setTopologyName("testTopology"); + spec.addBoltPolicy("alertBolt1", pd.getName()); + spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(Arrays.asList(pd))); + + bolt.onAlertBoltSpecChange(spec, sds); + } + + private StreamDefinition createCorrelateStream(String streamId) { + // construct StreamDefinition + StreamDefinition schema = new StreamDefinition(); + schema.setStreamId(streamId); + List columns = new LinkedList<>(); + { + StreamColumn column = new StreamColumn(); + column.setName("timestamp"); + column.setType(StreamColumn.Type.LONG); + columns.add(column); + } + { + StreamColumn column = new StreamColumn(); + column.setName("docId"); + column.setType(StreamColumn.Type.STRING); + columns.add(column); + } + { + StreamColumn column = new StreamColumn(); + column.setName("parentKey"); + column.setType(StreamColumn.Type.STRING); + columns.add(column); + } + { + StreamColumn column = new StreamColumn(); + column.setName("linkedSwitch"); + column.setType(StreamColumn.Type.STRING); + columns.add(column); + } + + schema.setColumns(columns); + return schema; + } + }