Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 733C3200B4B for ; Thu, 21 Jul 2016 08:40:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 71ABA160A6D; Thu, 21 Jul 2016 06:40:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0958C160A68 for ; Thu, 21 Jul 2016 08:40:17 +0200 (CEST) Received: (qmail 31058 invoked by uid 500); 21 Jul 2016 06:40:17 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 31048 invoked by uid 99); 21 Jul 2016 06:40:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jul 2016 06:40:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 41145C3CAD for ; Thu, 21 Jul 2016 06:40:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id UB-6DlsK3pYt for ; Thu, 21 Jul 2016 06:40:02 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id AD6DA5FACF for ; Thu, 21 Jul 2016 06:40:00 +0000 (UTC) Received: (qmail 30844 invoked by uid 99); 21 Jul 2016 06:39:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Jul 2016 06:39:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6BE16E1101; Thu, 21 Jul 2016 06:39:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ralphsu@apache.org To: commits@eagle.incubator.apache.org Date: Thu, 21 Jul 2016 06:40:00 -0000 Message-Id: In-Reply-To: <9f0a57bce3d44aea89bdbec140a7bb61@git.apache.org> References: <9f0a57bce3d44aea89bdbec140a7bb61@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-eagle git commit: [EAGLE-384] Alert Engine BugFix and Improvements archived-at: Thu, 21 Jul 2016 06:40:20 -0000 [EAGLE-384] Alert Engine BugFix and Improvements 1. Add schedule time in schedule state 2. Add publishment.toString() 3. Add cooridnator test for poicy update 4. Refine logging 5. Avoid explicit IP expose, use localhost instead 6. Ignore test cases that are empty 7. Add *.orig in apache rate exclusion 8. Ignore all EmbedHbase cases Author: ralphsu Reviewer: ralphsu Closes #271 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f0af3e5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f0af3e5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f0af3e5d Branch: refs/heads/develop Commit: f0af3e5dc7cebdff426d0450bea7256a31ca77ec Parents: 04468a9 Author: Ralph, Su Authored: Thu Jul 21 14:37:43 2016 +0800 Committer: Ralph, Su Committed: Thu Jul 21 14:42:46 2016 +0800 ---------------------------------------------------------------------- .../alert/coordination/model/ScheduleState.java | 10 ++ .../model/Tuple2StreamConverter.java | 8 +- .../alert/engine/coordinator/Publishment.java | 8 + .../eagle/alert/coordinator/Coordinator.java | 31 +++- .../coordinator/impl/GreedyPolicyScheduler.java | 3 +- .../provider/InMemScheduleConext.java | 4 + .../provider/ScheduleContextBuilder.java | 32 +++- .../coordinator/DynamicPolicyLoaderTest.java | 4 +- .../coordinator/ScheduleContextBuilderTest.java | 73 +++++++- .../apache/alert/coordinator/SchedulerTest.java | 175 +++++++++++++++---- .../alert/coordinator/WorkSlotStrategyTest.java | 12 +- .../mock/InMemMetadataServiceClient.java | 22 ++- .../mock/TestTopologyMgmtService.java | 67 ++++--- .../src/test/resources/multi/datasources.json | 19 ++ .../src/test/resources/multi/policies.json | 52 ++++++ .../src/test/resources/multi/publishments.json | 26 +++ .../test/resources/multi/streamdefinitions.json | 138 +++++++++++++++ .../src/test/resources/multi/topologies.json | 31 ++++ .../evaluator/nodata/NoDataPolicyHandler.java | 20 +++ .../publisher/impl/AlertKafkaPublisher.java | 4 +- .../publisher/impl/AlertPublisherImpl.java | 4 + .../publisher/impl/KafkaProducerManager.java | 45 +++-- .../eagle/alert/engine/runner/AlertBolt.java | 2 +- .../src/main/resources/application.conf | 10 +- .../eagle/alert/engine/e2e/Integration1.java | 38 ++-- .../eagle/alert/engine/e2e/Integration3.java | 107 ++++++++++++ .../engine/e2e/Integration5AbsenceAlert.java | 5 +- .../eagle/alert/engine/e2e/SampleClient3.java | 96 ++++++++++ .../eagle/alert/engine/topology/TestBolt.java | 11 +- .../resources/absence/application-absence.conf | 14 +- .../src/test/resources/absence/datasources.json | 30 ++-- .../src/test/resources/absence/policies.json | 44 ++--- .../test/resources/absence/publishments.json | 36 ++-- .../resources/absence/streamdefinitions.json | 54 +++--- .../src/test/resources/absence/topologies.json | 58 +++--- .../src/test/resources/e2e/application-e2e.conf | 60 +++++++ .../src/test/resources/e2e/datasources.json | 21 +++ .../src/test/resources/e2e/policies.json | 28 +++ .../src/test/resources/e2e/publishments.json | 17 ++ .../src/test/resources/e2e/sherlock.json | 52 ++++++ .../test/resources/e2e/streamdefinitions.json | 76 ++++++++ .../src/test/resources/e2e/topologies.json | 31 ++++ .../src/test/resources/e2e/ump_demo_schema.json | 170 ++++++++++++++++++ .../src/test/resources/nodata/policies.json | 2 +- .../src/test/resources/nodata/publishments.json | 2 +- .../simple/application-integration.conf | 2 +- .../src/test/resources/simple/publishments.json | 2 +- .../metadata/impl/MongoMetadataDaoImpl.java | 3 +- .../alert/resource/impl/MongoImplTest.java | 26 ++- .../src/test/resources/application-mongo.conf | 2 +- .../app/AlertDropWizardConfiguration.java | 13 ++ .../apache/eagle/service/app/ServiceApp.java | 9 + .../src/main/resources/application.conf | 2 +- .../src/main/resources/log4j.properties | 2 +- .../eagle-alert-parent/eagle-alert/pom.xml | 1 + .../eagle/service/hbase/TestHBaseBase.java | 1 + .../service/generic/TestListQueryResource.java | 1 + .../eagle/storage/hbase/TestHBaseStatement.java | 1 + .../coprocessor/TestGroupAggregateClient.java | 1 + .../TestGroupAggregateTimeSeriesClient.java | 1 + 60 files changed, 1547 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java index 6036f29..93d038f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java @@ -62,8 +62,10 @@ public class ScheduleState { private String generateTime; private int code = 200; private String message = "OK"; + private int scheduleTimeMillis; public ScheduleState() { + this.generateTime = String.valueOf(new Date().getTime()); } public ScheduleState(String version, @@ -212,4 +214,12 @@ public class ScheduleState { this.streamSnapshots = streamSnapshots; } + public int getScheduleTimeMillis() { + return scheduleTimeMillis; + } + + public void setScheduleTimeMillis(int scheduleTimeMillis) { + this.scheduleTimeMillis = scheduleTimeMillis; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java index f152b4c..8867fd7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java @@ -67,8 +67,8 @@ public class Tuple2StreamConverter { Object timeObject = m.get(metadata.getTimestampColumn()); long timestamp = 0L; if (timeObject == null) { - if (LOG.isWarnEnabled()) { - LOG.warn("continue with current timestamp since no timestamp column specified! Metadata : ", metadata); + if (LOG.isDebugEnabled()) { + LOG.debug("continue with current timestamp since no timestamp column specified! Metadata : {} ", metadata); } timestamp = System.currentTimeMillis(); } else if (timeObject instanceof Number) { @@ -77,8 +77,8 @@ public class Tuple2StreamConverter { String timestampFieldValue = (String) m.get(metadata.getTimestampColumn()); String dateFormat = metadata.getTimestampFormat(); if (Strings.isNullOrEmpty(dateFormat)) { - if (LOG.isWarnEnabled()) { - LOG.warn("continue with current timestamp becuase no data format sepcified! Metadata : ", metadata); + if (LOG.isDebugEnabled()) { + LOG.debug("continue with current timestamp becuase no data format sepcified! Metadata : {} ", metadata); } timestamp = System.currentTimeMillis(); } else http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java index b921b79..3ba2dcf 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java @@ -101,4 +101,12 @@ public class Publishment { .append(properties).build(); } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:") + .append(policyIds).append(",properties:").append(properties); + return sb.toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java index 9c97f9f..df4bc34 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -39,14 +40,17 @@ import org.apache.eagle.alert.service.MetadataServiceClientImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Stopwatch; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** - * @since Mar 24, 2016 Coordinator is a standalone java application, which - * listens to policy changes and use schedule algorithm to distribute - * policies 1) reacting to shutdown events 2) start non-daemon thread to - * pull policies and figure out if polices are changed + * TODO: To simply avoid concurrent call of schdule, make the schedule as synchronized. This is not safe when multiple + * instance, consider a distributed lock for prevent multiple schedule happen concurrently. + * + * @since Mar 24, 2016 Coordinator is a standalone java application, which listens to policy changes and use schedule + * algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies + * and figure out if polices are changed */ public class Coordinator { @@ -94,6 +98,7 @@ public class Coordinator { } public synchronized ScheduleState schedule(ScheduleOption option) { + Stopwatch watch = Stopwatch.createStarted(); IScheduleContext context = new ScheduleContextBuilder(client).buildContext(); TopologyMgmtService mgmtService = new TopologyMgmtService(); IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler(); @@ -101,9 +106,18 @@ public class Coordinator { scheduler.init(context, mgmtService); ScheduleState state = scheduler.schedule(option); + long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS); + state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer + watch.reset(); + watch.start(); + // persist & notify postSchedule(client, state, producer); + watch.stop(); + long postTime = watch.elapsed(TimeUnit.MILLISECONDS); + LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime); + currentState = state; return state; } @@ -207,7 +221,14 @@ public class Coordinator { // schedule dynamic policy loader long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS); long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS); - ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2); + ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(); + t.setDaemon(true); + return t; + } + }); scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS); // http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java index a9b6c00..6c98fa6 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java @@ -137,6 +137,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler { expectParal = policyDefaultParallelism; } // how to handle expand of an policy in a smooth transition manner + // TODO policy fix PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName()); if (assignment != null) { LOG.info("policy {} already allocated", def.getName()); @@ -297,7 +298,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler { * @return */ private int getQueueSize(int hint) { - return initialQueueSize; + return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize); } private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java index 84a4061..dea9419 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java @@ -145,4 +145,8 @@ public class InMemScheduleConext implements IScheduleContext { return publishments; } + public void addPublishment(Publishment pub) { + this.publishments.put(pub.getName(), pub); + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java index d4d6c0c..fd09462 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java @@ -92,9 +92,16 @@ public class ScheduleContextBuilder { // TODO: See ScheduleState comments on how to improve the storage ScheduleState state = client.getVersionedSpec(); - assignments = listToMap(state == null ? new ArrayList() : cleanupDeprecatedAssignments(state.getAssignments())); - monitoredStreamMap = listToMap(state == null ? new ArrayList() : cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams())); + // detect policy update, remove the policy assignments. + // definition change : the assignment would NOT change, the runtime will do reload and check + // stream change : the assignment would NOT change, the runtime will do reload and check + // data source change : the assignment would NOT change, the runtime will do reload and check + // parallelism change : the policies' assignment would be dropped when it's bigger than assign queue, and expect + // to be assigned in scheduler. + assignments = listToMap(state == null ? new ArrayList() : detectAssignmentsChange(state.getAssignments(), state)); + + monitoredStreamMap = listToMap(state == null ? new ArrayList() : detectMonitoredStreams(state.getMonitoredStreams())); // build based on existing data usages = buildTopologyUsage(); @@ -119,7 +126,7 @@ public class ScheduleContextBuilder { * @param monitoredStreams * @return */ - private List cleanupDeprecatedStreamsAndAssignment(List monitoredStreams) { + private List detectMonitoredStreams(List monitoredStreams) { List result = new ArrayList(monitoredStreams); // clear deprecated streams @@ -193,14 +200,31 @@ public class ScheduleContextBuilder { } } - private List cleanupDeprecatedAssignments(List list) { + private List detectAssignmentsChange(List list, ScheduleState state) { + // FIXME: duplciated build map ? + Map queueMap = new HashMap(); + for (MonitoredStream ms : state.getMonitoredStreams()) { + for (StreamWorkSlotQueue q : ms.getQueues()) { + queueMap.put(q.getQueueId(), q); + } + } + List result = new ArrayList(list); Iterator paIt = result.iterator(); while (paIt.hasNext()) { PolicyAssignment assignment = paIt.next(); + if (!policies.containsKey(assignment.getPolicyName())) { LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment); paIt.remove(); + } else { + StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId()); + if (queue == null + || policies.get(assignment.getPolicyName()).getParallelismHint() > queue.getQueueSize()) { + // queue not found or policy has hint bigger than queue (possible a poilcy update) + LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue); + paIt.remove(); + } } } return result; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java index dcb031e..fe62b3b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java @@ -19,11 +19,9 @@ package org.apache.alert.coordinator; -import org.junit.Ignore; - /** * Since 4/28/16. */ -@Ignore +@org.junit.Ignore public class DynamicPolicyLoaderTest { } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java index f2e67de..ed9d7b7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java @@ -38,6 +38,7 @@ import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; import org.apache.eagle.alert.coordinator.model.TopologyUsage; import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type; @@ -91,7 +92,7 @@ public class ScheduleContextBuilderTest { Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - client.listPolicies().remove(0); + client.removePolicy(0); context = builder.buildContext(); Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); @@ -102,7 +103,7 @@ public class ScheduleContextBuilderTest { } @Test - public void test_changed_policy() { + public void test_changed_policy_partition() { InMemMetadataServiceClient client = getSampleMetadataService(); ScheduleContextBuilder builder = new ScheduleContextBuilder(client); PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); @@ -140,6 +141,56 @@ public class ScheduleContextBuilderTest { } @Test + public void test_changed_policy_parallelism() { + InMemMetadataServiceClient client = getSampleMetadataService(); + ScheduleContextBuilder builder = new ScheduleContextBuilder(client); + PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); + + IScheduleContext context = builder.buildContext(); + Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); + + StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); + + PolicyDefinition pd1 = client.listPolicies().get(0); + pd1.setParallelismHint(4); // default queue is 5 , change to smaller, has no effect + + context = builder.buildContext(); + PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); + StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); + Assert.assertNotNull(queueNew); + // just to make sure queueNew is present + Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); + + // default queue is 5 , change to bigger 6, policy assignment removed + pd1.setParallelismHint(queue.getQueueSize() + 1); + context = builder.buildContext(); + + Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext()); + } + + @Test + public void test_changed_policy_definition() { + InMemMetadataServiceClient client = getSampleMetadataService(); + ScheduleContextBuilder builder = new ScheduleContextBuilder(client); + PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); + + IScheduleContext context = builder.buildContext(); + Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); + + StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); + + PolicyDefinition pd1 = client.listPolicies().get(0); + pd1.getDefinition().value = "define.. new..."; + + context = builder.buildContext(); + PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next(); + StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight(); + Assert.assertNotNull(queueNew); + // just to make sure queueNew is present + Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId()); + } + + @Test public void test_renamed_topologies() { InMemMetadataServiceClient client = getSampleMetadataService(); ScheduleContextBuilder builder = new ScheduleContextBuilder(client); @@ -167,12 +218,11 @@ public class ScheduleContextBuilderTest { public static InMemMetadataServiceClient getSampleMetadataService() { InMemMetadataServiceClient client = new InMemMetadataServiceClient(); - client.listTopologies().add(createSampleTopology()); - client.listDataSources().add(createKafka2TupleMetadata()); - // client.listSpoutMetadata().add(createS) - client.listPolicies().add(createPolicy()); - client.listPublishment().add(createPublishment()); - client.listStreams().add(createStreamDefinition()); + client.addTopology(createSampleTopology()); + client.addDataSource(createKafka2TupleMetadata()); + client.addPolicy(createPolicy()); + client.addPublishment(createPublishment()); + client.addStreamDefinition(createStreamDefinition()); client.addScheduleState(createScheduleState()); return client; } @@ -195,9 +245,15 @@ public class ScheduleContextBuilderTest { WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0); WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1); WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2); + WorkSlot slot3 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 3); + WorkSlot slot4 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 4); + WorkSlot slot5 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 5); slots.add(slot0); slots.add(slot1); slots.add(slot2); + slots.add(slot3); + slots.add(slot4); + slots.add(slot5); StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots); ms.addQueues(q); @@ -216,6 +272,7 @@ public class ScheduleContextBuilderTest { def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1)); def.setOutputStreams(Arrays.asList(OUT_STREAM1)); def.setParallelismHint(5); + def.setDefinition(new Definition()); streamGroup = new StreamGroup(); par = new StreamPartition(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java index f90decc..53de19a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java @@ -16,6 +16,7 @@ */ package org.apache.alert.coordinator; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -23,6 +24,7 @@ import java.util.List; import java.util.Map; import org.apache.alert.coordinator.mock.TestTopologyMgmtService; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.alert.coordination.model.AlertBoltSpec; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; @@ -41,32 +43,42 @@ import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.coordinator.IScheduleContext; import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.coordinator.TopologyMgmtService; import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler; +import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; +import org.apache.eagle.alert.coordinator.model.GroupBoltUsage; import org.apache.eagle.alert.coordinator.model.TopologyUsage; import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition; +import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.codehaus.jackson.map.ObjectMapper; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** + * * @since Apr 22, 2016 * */ public class SchedulerTest { + private static final String TOPO2 = "topo2"; + private static final String TOPO1 = "topo1"; + private static final int PARALELLISM = 5; private static final String STREAM2 = "stream2"; private static final String JOIN_POLICY_1 = "join-policy-1"; private static final String TEST_TOPIC = "test-topic"; @@ -83,12 +95,13 @@ public class SchedulerTest { ConfigFactory.invalidateCaches(); System.setProperty("config.resource", "/application.conf"); } - + @Test public void test01_simple() throws Exception { GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - IScheduleContext context = createScheduleContext(); - ps.init(context, createMgmtService()); + TestTopologyMgmtService mgmtService = createMgmtService(); + IScheduleContext context = createScheduleContext(mgmtService); + ps.init(context, mgmtService); ps.schedule(new ScheduleOption()); ScheduleState status = ps.getState(); @@ -97,7 +110,7 @@ public class SchedulerTest { LOG.info(mapper.writeValueAsString(spec)); Assert.assertEquals(2, spec.size()); - Assert.assertTrue(spec.containsKey("topo1")); + Assert.assertTrue(spec.containsKey(TOPO1)); assertFirstPolicyScheduled(context, status); } @@ -123,8 +136,8 @@ public class SchedulerTest { Assert.assertEquals(1, streamMeta.groupingStrategies.size()); StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(5, gs.totalTargetBoltIds.size()); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); Assert.assertEquals(0, gs.startSequence); Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); @@ -198,24 +211,23 @@ public class SchedulerTest { } } - private TopologyMgmtService createMgmtService() { - TestTopologyMgmtService.BOLT_NUMBER = 5; - TopologyMgmtService mgmtService = new TestTopologyMgmtService(); + private TestTopologyMgmtService createMgmtService() { + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); return mgmtService; } - private InMemScheduleConext createScheduleContext() { + private InMemScheduleConext createScheduleContext(TestTopologyMgmtService mgmtService) { InMemScheduleConext context = new InMemScheduleConext(); // topo - Pair pair1 = TestTopologyMgmtService.createEmptyTopology("topo1"); - Pair pair2 = TestTopologyMgmtService.createEmptyTopology("topo2"); + Pair pair1 = mgmtService.createEmptyTopology(TOPO1); + Pair pair2 = mgmtService.createEmptyTopology(TOPO2); context.addTopology(pair1.getLeft()); context.addTopologyUsages(pair1.getRight()); context.addTopology(pair2.getLeft()); context.addTopologyUsages(pair2.getRight()); // policy - createSamplePolicy(context, TEST_POLICY_1, STREAM1); + createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM); // data source Kafka2TupleMetadata ds = new Kafka2TupleMetadata(); @@ -269,9 +281,9 @@ public class SchedulerTest { */ @Test public void test_schedule_add2() { - IScheduleContext context = createScheduleContext(); + TestTopologyMgmtService mgmtService = createMgmtService(); + IScheduleContext context = createScheduleContext(mgmtService); GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - TopologyMgmtService mgmtService = new TopologyMgmtService(); ps.init(context, mgmtService); ScheduleOption option = new ScheduleOption(); @@ -280,7 +292,7 @@ public class SchedulerTest { context = ps.getContext(); // context updated! assertFirstPolicyScheduled(context, status); - createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1); + createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1, PARALELLISM); ps.init(context, mgmtService); // reinit ps.schedule(option); @@ -290,7 +302,7 @@ public class SchedulerTest { assertSecondPolicyCreated(context, status); // add one policy on different stream of the same topic - createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2); + createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2, PARALELLISM); ps.init(context, mgmtService); // re-init ps.schedule(option); @@ -320,8 +332,8 @@ public class SchedulerTest { Assert.assertEquals(1, streamMeta.groupingStrategies.size()); StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(5, gs.totalTargetBoltIds.size()); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); Assert.assertEquals(0, gs.startSequence); PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1); @@ -347,8 +359,8 @@ public class SchedulerTest { Assert.assertEquals(1, streamMeta.groupingStrategies.size()); StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(5, gs.totalTargetBoltIds.size()); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); Assert.assertEquals(0, gs.startSequence); // assert policy assignment for the three policies @@ -362,7 +374,8 @@ public class SchedulerTest { Assert.assertNotEquals(pa1.getQueueId(), pa3.getQueueId()); StreamWorkSlotQueue queue1 = getQueue(context, pa1.getQueueId()).getRight(); StreamWorkSlotQueue queue3 = getQueue(context, pa3.getQueueId()).getRight(); - Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(), queue3.getWorkingSlots().get(0).getTopologyName()); + Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(), + queue3.getWorkingSlots().get(0).getTopologyName()); } } // group spec @@ -425,8 +438,8 @@ public class SchedulerTest { Assert.assertEquals(1, streamMeta.groupingStrategies.size()); StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next(); - Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts); - Assert.assertEquals(5, gs.totalTargetBoltIds.size()); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts); + Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size()); Assert.assertEquals(0, gs.startSequence); // assert two policy on the same queue @@ -492,7 +505,8 @@ public class SchedulerTest { LOG.info("alert spec topology name {}", topo1); for (List definitions : alertSpec.getBoltPolicyIdsMap().values()) { Assert.assertEquals(2, definitions.size()); -// List names = Arrays.asList(definitions.stream().map((t) -> t.getName()).toArray(String[]::new)); + // List names = Arrays.asList(definitions.stream().map((t) -> + // t.getName()).toArray(String[]::new)); Assert.assertTrue(definitions.contains(TEST_POLICY_1)); Assert.assertTrue(definitions.contains(TEST_POLICY_2)); } @@ -540,9 +554,9 @@ public class SchedulerTest { Assert.assertTrue(list.contains(gs2)); } - private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream) { + private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream, int hint) { PolicyDefinition pd = new PolicyDefinition(); - pd.setParallelismHint(5); + pd.setParallelismHint(hint); Definition def = new Definition(); pd.setDefinition(def); pd.setName(policyName); @@ -566,7 +580,12 @@ public class SchedulerTest { @Test public void test_schedule_updateParitition() { - // TODO + // This case design test is move to outter logic of ScheduleConetxtBuilder + } + + @Test + public void test_schedule_updateDefinition() { + // This case design test is move to outter logic of ScheduleConetxtBuilder } @Test @@ -577,9 +596,9 @@ public class SchedulerTest { @SuppressWarnings("unused") @Test public void test_schedule_multipleStream() throws Exception { - IScheduleContext context = createScheduleContext(); + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); + IScheduleContext context = createScheduleContext(mgmtService); GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); - TopologyMgmtService mgmtService = new TopologyMgmtService(); createJoinPolicy((InMemScheduleConext) context, JOIN_POLICY_1, Arrays.asList(STREAM1, STREAM2)); @@ -632,4 +651,96 @@ public class SchedulerTest { } context.addPoilcy(pd); } + + @Test + public void testIrregularPolicyParallelismHint() { + Config config = ConfigFactory.load(); + int defaultParallelism = config.getInt("coordinator.policyDefaultParallelism"); + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 12); + InMemScheduleConext context = createScheduleContext(mgmtService); + // recreate test poicy + context.getPolicies().clear(); + // make the hint bigger than bolt number + int irregularParallelism = defaultParallelism + 2; + createSamplePolicy(context, "irregularPolicy", STREAM1, irregularParallelism); + GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); + + ps.init(context, mgmtService); + + ScheduleState scheduled = ps.schedule(new ScheduleOption()); + Assert.assertEquals(2, scheduled.getSpoutSpecs().size()); + Assert.assertEquals(2, scheduled.getGroupSpecs().size()); + Assert.assertEquals(2, scheduled.getAlertSpecs().size()); + // assertion + RouterSpec spec = scheduled.getGroupSpecs().get(TOPO1); + Assert.assertTrue(spec.getRouterSpecs().size() > 0); // must be allocated + for (StreamRouterSpec routerSpec : spec.getRouterSpecs()) { + Assert.assertEquals(1, routerSpec.getTargetQueue().size()); + // irregularParallelism is prompted to 2 * defaultParallelism = 10 + Assert.assertEquals(10, routerSpec.getTargetQueue().get(0).getWorkers().size()); + } + } + + @Test + public void testDataSources() throws Exception { + InMemScheduleConext context = loadContext("/multi/"); + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(4, 10); + + GreedyPolicyScheduler ps = new GreedyPolicyScheduler(); + ps.init(context, mgmtService); + + ScheduleState state = ps.schedule(new ScheduleOption()); + Assert.assertNotNull(state); + Assert.assertEquals(2, state.getAssignments().size()); + Assert.assertEquals(1, state.getAlertSpecs().size()); + Assert.assertEquals(10, state.getAlertSpecs().get("alertUnitTopology_1").getBoltPolicyIdsMap().size()); + } + + private InMemScheduleConext loadContext(String base) throws Exception { + InMemScheduleConext context = new InMemScheduleConext(); + + List metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class); + for (Kafka2TupleMetadata k : metadata) { + context.addDataSource(k); + } + + List policies = loadEntities(base + "policies.json", PolicyDefinition.class); + for (PolicyDefinition p : policies) { + context.addPoilcy(p); + } + + List pubs = loadEntities(base + "publishments.json", Publishment.class); + for (Publishment pub : pubs) { + context.addPublishment(pub); + } + + List defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class); + for (StreamDefinition def : defs) { + context.addSchema(def); + } + + List topos = loadEntities(base + "topologies.json", Topology.class); + for (Topology t : topos) { + context.addTopology(t); + + TopologyUsage u = new TopologyUsage(t.getName()); + for (String gnid : t.getGroupNodeIds()) { + u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid)); + } + for (String anid : t.getAlertBoltIds()) { + u.getAlertUsages().put(anid, new AlertBoltUsage(anid)); + } + context.addTopologyUsages(u); + } + + return context; + } + + public static List loadEntities(String path, Class tClz) throws Exception { + System.out.println(FileUtils.readFileToString(new File(SchedulerTest.class.getResource(path).getPath()))); + JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz)); + List l = mapper.readValue(SchedulerTest.class.getResourceAsStream(path), type); + return l; + } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java index e17ebab..52ea022 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java @@ -58,18 +58,18 @@ public class WorkSlotStrategyTest { StreamGroup group = new StreamGroup(); group.addStreamPartition(partition); - TestTopologyMgmtService.BOLT_NUMBER = 3; - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); - SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService); { + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 3, "prefix-time1", true); + SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService); List slots = strategy.reserveWorkSlots(5, false, new HashMap()); Assert.assertEquals(0, slots.size()); Assert.assertEquals(1, context.getTopologies().size()); } - TestTopologyMgmtService.BOLT_NUMBER = 5; { + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-time2", true); + SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService); List slots = strategy.reserveWorkSlots(5, false, new HashMap()); Assert.assertEquals(5, slots.size()); LOG.info(slots.get(0).getTopologyName()); @@ -110,8 +110,7 @@ public class WorkSlotStrategyTest { MonitoredStream ms1 = new MonitoredStream(sg); - TestTopologyMgmtService.BOLT_NUMBER = 5; - TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(); + TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-3", true); String topo1 = null; String bolt1 = null; @@ -145,7 +144,6 @@ public class WorkSlotStrategyTest { sg2.addStreamPartition(partition2); MonitoredStream ms2 = new MonitoredStream(sg2); queue = wrb.createQueue(ms2, false, 5, new HashMap()); - TestTopologyMgmtService.BOLT_NUMBER = 5; { Assert.assertEquals(5, queue.getWorkingSlots().size()); Assert.assertEquals(2, context.getTopologies().size()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java index 6b88f7d..f19533a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java @@ -18,6 +18,7 @@ package org.apache.alert.coordinator.mock; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -35,6 +36,9 @@ import org.apache.eagle.alert.engine.coordinator.StreamingCluster; import org.apache.eagle.alert.service.IMetadataServiceClient; /** + * According to metadata servic client semantic, change to the interface returned value should not direclty change the + * states. + * * @since May 5, 2016 * */ @@ -57,37 +61,41 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient { @Override public List listClusters() { - return clusters; + return Collections.unmodifiableList(clusters); } @Override public List listTopologies() { - return topologies; + return Collections.unmodifiableList(topologies); } @Override public List listPolicies() { - return policies; + return Collections.unmodifiableList(policies); + } + + public void removePolicy(int idx) { + policies.remove(idx); } @Override public List listStreams() { - return definitions; + return Collections.unmodifiableList(definitions); } @Override public List listDataSources() { - return datasources; + return Collections.unmodifiableList(datasources); } @Override public List listPublishment() { - return publishmetns; + return Collections.unmodifiableList(publishmetns); } @Override public List listSpoutMetadata() { - return spoutSpecs; + return Collections.unmodifiableList(spoutSpecs); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java index fe36d18..d897454 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java @@ -16,52 +16,77 @@ */ package org.apache.alert.coordinator.mock; +import java.util.List; + import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.coordinator.TopologyMgmtService; import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; import org.apache.eagle.alert.coordinator.model.GroupBoltUsage; import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.junit.Ignore; -import java.util.List; - -@Ignore +@org.junit.Ignore public class TestTopologyMgmtService extends TopologyMgmtService { - public static int BOLT_NUMBER = 5; - + public static final int ROUTER_BOLT_NUMBER = 6; + public static final int BOLT_NUMBER = 7; + + private int boltNumber; + private int routerNumber; private int i = 0; + private String namePrefix = "Topology"; + + // a config used to check if createTopology is enabled. FIXME: another class of mgmt service might be better + private boolean enableCreateTopology = false; + + public TestTopologyMgmtService() { + boltNumber = BOLT_NUMBER;// default behaivor + this.routerNumber = ROUTER_BOLT_NUMBER; + } + + public TestTopologyMgmtService(int routerNumber, int boltNumber) { + this.routerNumber = routerNumber; + this.boltNumber = boltNumber; + } + + public TestTopologyMgmtService(int routerNumber, int boltNumber, String prefix, boolean enable) { + this.routerNumber = routerNumber; + this.boltNumber = boltNumber; + this.namePrefix = prefix; + this.enableCreateTopology = enable; + } @Override public TopologyMeta creatTopology() { - TopologyMeta tm = new TopologyMeta(); - tm.topologyId = "Topoloy" + (i++); - tm.clusterId = "default-cluster"; - tm.nimbusHost = "localhost"; - tm.nimbusPort = "3000"; - Pair pair = TestTopologyMgmtService.createEmptyTopology(tm.topologyId); - tm.topology = pair.getLeft(); - tm.usage = pair.getRight(); - - return tm; + if (enableCreateTopology) { + TopologyMeta tm = new TopologyMeta(); + tm.topologyId = namePrefix + (i++); + tm.clusterId = "default-cluster"; + tm.nimbusHost = "localhost"; + tm.nimbusPort = "3000"; + Pair pair = createEmptyTopology(tm.topologyId); + tm.topology = pair.getLeft(); + tm.usage = pair.getRight(); + return tm; + } else { + throw new UnsupportedOperationException("not supported yet!"); + } } @Override public List listTopologies() { - // TODO Auto-generated method stub return super.listTopologies(); } - public static Pair createEmptyTopology(String topoName) { - Topology t = new Topology(topoName, TestTopologyMgmtService.BOLT_NUMBER, TestTopologyMgmtService.BOLT_NUMBER); + public Pair createEmptyTopology(String topoName) { + Topology t = new Topology(topoName, routerNumber, boltNumber); for (int i = 0; i < t.getNumOfGroupBolt(); i++) { t.getGroupNodeIds().add(t.getName() + "-grp-" + i); } for (int i = 0; i < t.getNumOfAlertBolt(); i++) { t.getAlertBoltIds().add(t.getName() + "-alert-" + i); } - + TopologyUsage u = new TopologyUsage(topoName); for (String gnid : t.getGroupNodeIds()) { u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid)); @@ -69,7 +94,7 @@ public class TestTopologyMgmtService extends TopologyMgmtService { for (String anid : t.getAlertBoltIds()) { u.getAlertUsages().put(anid, new AlertBoltUsage(anid)); } - + return Pair.of(t, u); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json new file mode 100644 index 0000000..8678fe6 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json @@ -0,0 +1,19 @@ +[ +{ + "name": "network_syslog_datasource", + "type": "KAFKA", + "properties": { + }, + "topic": "logoutput", + "schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme", + "codec": { + "streamNameSelectorProp": { + "fieldNamesToInferStreamName" : "namespace", + "streamNameFormat":"stream_%s" + }, + "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector", + "timestampColumn": null, + "timestampFormat":"" + } +} +] http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json new file mode 100644 index 0000000..ba7d120 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json @@ -0,0 +1,52 @@ +[ + { + "name": "syslog_severity_check_stream_demons1", + "description": "syslog demons1", + "inputStreams": [ + "stream_demons1" + ], + "outputStreams": [ + "syslog_severity_check_output" + ], + "definition": { + "type": "siddhi", + "value": "from syslog_stream[dims_severity == \"NOTICE\"] select 'alert' as name, namespace, epochMillis, timestamp, interface,state,version,type,origmsg, dims_severity, dims_facility, dims_hostname, dims_msgid insert into syslog_severity_check_output;" + }, + "partitionSpec": [ + { + "streamId": "stream_demons1", + "type": "GROUPBY", + "columns": [ + "dims_hostname" + ], + "sortSpec": null + } + ], + "parallelismHint": 5 + }, + { + "name": "syslog_severity_check_stream_umpns2", + "description": "syslog umpn2", + "inputStreams": [ + "stream_umpns2" + ], + "outputStreams": [ + "syslog_severity_check_output" + ], + "definition": { + "type": "siddhi", + "value": "from syslog_stream[dims_severity == \"NOTICE\"] select 'alert' as name, namespace, epochMillis, program, message, dims_severity, dims_facility, dims_hostname, dims_msgid insert into syslog_severity_check_output;" + }, + "partitionSpec": [ + { + "streamId": "stream_umpns2", + "type": "GROUPBY", + "columns": [ + "dims_hostname" + ], + "sortSpec": null + } + ], + "parallelismHint": 5 + } +] http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json new file mode 100644 index 0000000..d306be2 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json @@ -0,0 +1,26 @@ +[ +{ + "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", + "name":"network-syslog-publish", + "policyIds": ["syslog_severity_check_stream_demons1", "syslog_severity_check_stream_umpns2"], + "dedupIntervalMin": "PT0M", + "properties":{ + "kafka_broker":"127.0.0.1:9092", + "topic":"syslog_alerts", + "value_deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "value_serializer": "org.apache.kafka.common.serialization.ByteArraySerializer" + }, + "serializer" : "org.apache.eagle.alert.engine.extension.SherlockAlertSerializer" +}, +{ + "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", + "name":"network-syslog-publish_rawjson", + "policyIds": ["syslog_severity_check_stream_demons1", "syslog_severity_check_stream_umpns2"], + "dedupIntervalMin": "PT0M", + "properties":{ + "kafka_broker":"127.0.0.1:9092", + "topic":"syslog_alerts_json" + }, + "serializer" : "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer" +} +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json new file mode 100644 index 0000000..cee6f8c --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json @@ -0,0 +1,138 @@ +[ +{ + "streamId": "stream_demons1", + "dataSource" : "network_syslog_datasource", + "description":"the data stream for syslog events", + "validate": false, + "timeseries":false, + "columns": [ + { + "name": "dims_facility", + "type" : "STRING", + "defaultValue": "", + "required":true + },{ + "name": "dims_severity", + "type" : "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_hostname", + "type" : "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_msgid", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "timestamp", + "type" : "STRING", + "defaultValue": "", + "required":true + },{ + "name": "state", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "interface", + "type" : "STRING", + "defaultValue": "", + "required":true + },{ + "name": "version", + "type" : "STRING", + "defaultValue": "", + "required":true + },{ + "name": "type", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "origmsg", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "name", + "type" : "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "namespace", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "epochMillis", + "type" : "LONG", + "defaultValue": 0, + "required": true + } + ] +}, +{ + "streamId": "stream_umpns2", + "dataSource" : "network_syslog_datasource", + "description":"the data stream for syslog events", + "validate": false, + "timeseries":false, + "columns": [ + { + "name": "dims_facility", + "type" : "STRING", + "defaultValue": "", + "required":true + },{ + "name": "dims_severity", + "type" : "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_hostname", + "type" : "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "dims_msgid", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "program", + "type" : "STRING", + "defaultValue": "", + "required":true + },{ + "name": "message", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "name", + "type" : "STRING", + "defaultValue": "", + "required": true + }, + { + "name": "namespace", + "type" : "STRING", + "defaultValue": "", + "required": true + },{ + "name": "epochMillis", + "type" : "LONG", + "defaultValue": 0, + "required": true + } + ] +} +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json new file mode 100644 index 0000000..7b1732f --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json @@ -0,0 +1,31 @@ +[ +{ + "name": "alertUnitTopology_1", + "numOfSpout":1, + "numOfGroupBolt": 4, + "numOfAlertBolt": 10, + "spoutId": "alertEngineSpout", + "groupNodeIds" : [ + "streamRouterBolt0", + "streamRouterBolt1", + "streamRouterBolt2", + "streamRouterBolt3" + ], + "alertBoltIds": [ + "alertBolt0", + "alertBolt1", + "alertBolt2", + "alertBolt3", + "alertBolt4", + "alertBolt5", + "alertBolt6", + "alertBolt7", + "alertBolt8", + "alertBolt9" + ], + "pubBoltId" : "alertPublishBolt", + "spoutParallelism": 1, + "groupParallelism": 1, + "alertParallelism": 1 +} +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java index 6e5beb6..0e9ab6c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java @@ -72,6 +72,26 @@ import org.slf4j.LoggerFactory; ], "parallelismHint": 2 } + "name": "noDataAlertPolicy", + "description": "noDataAlertPolicy", + "inputStreams": [ + "noDataAlertStream" + ], + "outputStreams": [ + "noDataAlertStream_out" + ], + "definition": { + "type": "nodataalert", + "value": "PT1M,plain,1,host,host1,host2" // or "value": "PT1M,dynamic,1,host" + }, + "partitionSpec": [ + { + "streamId": "noDataAlertStream", + "type": "GROUPBY" + } + ], + "parallelismHint": 2 + } */ public class NoDataPolicyHandler implements PolicyStreamHandler{ private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java index 2566f79..ca46aeb 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java @@ -79,7 +79,9 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); status.successful = true; status.errorMessage = ""; - LOG.info("Successfully send message to Kafka: " + brokerList); + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully send message to Kafka: " + brokerList); + } } catch (InterruptedException | ExecutionException e) { status.successful = false; status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index 6baa616..f9306dc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -104,6 +104,10 @@ public class AlertPublisherImpl implements AlertPublisher { } for (Publishment publishment : added) { + if (LOG.isDebugEnabled()) { + LOG.debug(publishment.toString()); + } + AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if(plugin != null) { publishPluginMapping.put(publishment.getName(), plugin); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java index 6b7fc61..fb038ba 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java @@ -21,18 +21,30 @@ import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The producer is thread safe and sharing a single producer instance across threads will generally be faster than * having multiple instances. */ public class KafkaProducerManager { + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerManager.class); private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String VALUE_DESERIALIZER = "value.deserializer"; - private static final String KEY_DESERIALIZER = "key.deserializer"; + private static final String VALUE_DESERIALIZER_UNDERSCORE = "value_deserializer"; + private static final String VALUE_SERIALIZER = "value.serializer"; + private static final String VALUE_SERIALIZER_UNDERSCORE = "value_serializer"; + + private static final String KEY_DESERIALIZER = "key.deserializer"; + private static final String KEY_DESERIALIZER_UNDERSCORE = "key_deserializer"; + private static final String KEY_SERIALIZER = "key.serializer"; + private static final String KEY_SERIALIZER_UNDERSCORE = "key_serializer"; public static final KafkaProducerManager INSTANCE = new KafkaProducerManager(); @@ -41,29 +53,36 @@ public class KafkaProducerManager { configMap.put("bootstrap.servers", brokerList); configMap.put("metadata.broker.list", brokerList); - if (kafkaConfig.containsKey(KEY_SERIALIZER)) { - configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER)); + // key serializer + if (kafkaConfig.containsKey(KEY_SERIALIZER_UNDERSCORE)) { + configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER_UNDERSCORE)); } else { configMap.put(KEY_SERIALIZER, STRING_SERIALIZER); } - if (kafkaConfig.containsKey(VALUE_SERIALIZER)) { - configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER)); + if (kafkaConfig.containsKey(KEY_DESERIALIZER_UNDERSCORE)) { + configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER_UNDERSCORE)); + } else { + configMap.put(KEY_DESERIALIZER, STRING_DESERIALIZER); + } + + // value serializer + if (kafkaConfig.containsKey(VALUE_SERIALIZER_UNDERSCORE)) { + configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER_UNDERSCORE)); } else { configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER); } configMap.put("request.required.acks", "1"); - - if (kafkaConfig.containsKey(KEY_DESERIALIZER)) { - configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER)); + + // value deserializer + if (kafkaConfig.containsKey(VALUE_DESERIALIZER_UNDERSCORE)) { + configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER_UNDERSCORE)); } else { - configMap.put(KEY_DESERIALIZER, STRING_SERIALIZER); + configMap.put(VALUE_DESERIALIZER, STRING_DESERIALIZER); } - if (kafkaConfig.containsKey(VALUE_DESERIALIZER)) { - configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER)); - } else { - configMap.put(VALUE_DESERIALIZER, STRING_SERIALIZER); + if (LOG.isInfoEnabled()) { + LOG.info(" given kafka config {}, create producer config map {}", kafkaConfig, configMap); } KafkaProducer producer = new KafkaProducer<>(configMap); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index e53b0ba..140fbff 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -67,7 +67,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen private volatile Map cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies private StreamContext streamContext; - private volatile Map sdf; + private volatile Map sdf = new HashMap(); private PartitionedEventSerializer serializer; public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf index e3be1b7..094fd92 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf @@ -24,8 +24,8 @@ "localMode" : "true" }, "spout" : { - "kafkaBrokerZkQuorum": "10.254.194.245:2181", - "kafkaBrokerZkBasePath": "/brokers", + "kafkaBrokerZkQuorum": "localhost:2181", + "kafkaBrokerZkBasePath": "/kafka", "stormKafkaUseSameZkQuorumWithKafkaBroker": true, "stormKafkaTransactionZkQuorum": "", "stormKafkaTransactionZkPath": "/consumers", @@ -34,8 +34,8 @@ "stormKafkaFetchSizeBytes": 1048586, }, "zkConfig" : { - "zkQuorum" : "10.254.194.245:2181", - "zkRoot" : "/kafka", + "zkQuorum" : "localhost:2181", + "zkRoot" : "/alert", "zkSessionTimeoutMs" : 10000, "connectionTimeoutMs" : 10000, "zkRetryTimes" : 3, @@ -47,7 +47,7 @@ }, "metadataService": { "context" : "/rest", - "host" : "127.0.0.1", + "host" : "localhost", "port" : 8080 }, "coordinatorService": { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java index ac07d19..8b28080 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java @@ -16,17 +16,16 @@ */ package org.apache.eagle.alert.engine.e2e; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; - +import backtype.storm.utils.Utils; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import kafka.admin.AdminUtils; import kafka.utils.ZKStringSerializer$; - import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.apache.eagle.alert.config.ZKConfig; import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; @@ -38,22 +37,15 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; import org.apache.eagle.alert.utils.KafkaEmbedded; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.utils.Utils; - -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; /** @@ -155,9 +147,7 @@ public class Integration1 { ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$); Properties topicConfiguration = new Properties(); - ZkConnection zkConnection = new ZkConnection(zkconfig.zkQuorum); -// ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); - AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration);// RackAwareMode.Disabled$.MODULE$); + AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration); } public static void proactive_schedule(Config config) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java new file mode 100644 index 0000000..cb8cda9 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.e2e; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.eagle.alert.engine.UnitTopologyMain; +import org.apache.eagle.alert.utils.KafkaEmbedded; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import backtype.storm.utils.Utils; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * @since May 10, 2016 + * + */ +public class Integration3 { + + private String[] args; + private ExecutorService executors = Executors.newFixedThreadPool(5); + private static KafkaEmbedded kafka; + + @BeforeClass + public static void setup() { + // FIXME : start local kafka + } + + @AfterClass + public static void end() { + if (kafka != null) { + kafka.shutdown(); + } + } + + /** + * Assumption: + *

+ * start metadata service 8080 /rest + * + *

+     * user@kafka-host:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syslog_events
+     * 
+ *

+ * + * @throws InterruptedException + */ + @Test + public void testSeverity() throws Exception { + System.setProperty("config.resource", "/e2e/application-e2e.conf"); + ConfigFactory.invalidateCaches(); + Config config = ConfigFactory.load(); + + System.out.println("loading metadatas..."); + Integration1.loadMetadatas("/e2e/", config); + System.out.println("loading metadatas done!"); + + // send sample sherlock data + executors.submit(() -> { + try { + SampleClient3.main(args); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + executors.submit(() -> { + try { + UnitTopologyMain.main(args); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + Utils.sleep(1000 * 5l); + while (true) { + Integration1.proactive_schedule(config); + + Utils.sleep(1000 * 60l * 5); + } + } + + @Test + public void testJson() throws Exception { + Integration1.checkAll("/e2e/"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java index 52d1e5d..4d039d4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java @@ -23,7 +23,6 @@ import org.apache.eagle.alert.engine.UnitTopologyMain; import org.apache.eagle.alert.utils.KafkaEmbedded; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.ExecutorService; @@ -50,7 +49,7 @@ public class Integration5AbsenceAlert { kafka.shutdown(); } } - @Test @Ignore + @Test public void testTriggerAbsenceAlert() throws Exception{ System.setProperty("config.resource", "/absence/application-absence.conf"); ConfigFactory.invalidateCaches(); @@ -91,4 +90,4 @@ public class Integration5AbsenceAlert { Utils.sleep(1000 * 60l * 5); } } -} \ No newline at end of file +}