Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C5FF200B13 for ; Wed, 1 Jun 2016 07:56:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8B008160A41; Wed, 1 Jun 2016 05:56:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 160A4160A49 for ; Wed, 1 Jun 2016 07:56:26 +0200 (CEST) Received: (qmail 76662 invoked by uid 500); 1 Jun 2016 05:56:26 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 76653 invoked by uid 99); 1 Jun 2016 05:56:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jun 2016 05:56:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 954A3C24CD for ; Wed, 1 Jun 2016 05:56:25 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id wD8ZHVgwEqoT for ; Wed, 1 Jun 2016 05:56:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 897B95FAF4 for ; Wed, 1 Jun 2016 05:56:17 +0000 (UTC) Received: (qmail 76371 invoked by uid 99); 1 Jun 2016 05:56:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Jun 2016 05:56:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D67FDFE61; Wed, 1 Jun 2016 05:56:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ralphsu@apache.org To: commits@eagle.incubator.apache.org Date: Wed, 01 Jun 2016 05:56:34 -0000 Message-Id: In-Reply-To: <255f27474f434d698401997b0858d9f6@git.apache.org> References: <255f27474f434d698401997b0858d9f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5 archived-at: Wed, 01 Jun 2016 05:56:29 -0000 EAGLE-324: Init branch-v0.5 Author: ralphsu Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/75a8265c Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/75a8265c Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/75a8265c Branch: refs/heads/branch-0.5 Commit: 75a8265c6ea2b26fd542de7e103a7235feb935c0 Parents: fb00fa3 Author: Ralph, Su Authored: Wed Jun 1 13:50:39 2016 +0800 Committer: Ralph, Su Committed: Wed Jun 1 13:50:39 2016 +0800 ---------------------------------------------------------------------- eagle-core/eagle-alert/alert/.gitignore | 80 +++ eagle-core/eagle-alert/alert/README.md | 151 +++++ .../eagle-alert/alert/alert-assembly/pom.xml | 86 +++ .../src/assembly/alert-assembly.xml | 41 ++ .../eagle-alert/alert/alert-common/.gitignore | 2 + .../eagle-alert/alert/alert-common/pom.xml | 129 ++++ .../eagle/alert/config/ConfigBusBase.java | 40 ++ .../eagle/alert/config/ConfigBusConsumer.java | 40 ++ .../eagle/alert/config/ConfigBusProducer.java | 37 ++ .../alert/config/ConfigChangeCallback.java | 5 + .../apache/eagle/alert/config/ConfigValue.java | 33 + .../org/apache/eagle/alert/config/ZKConfig.java | 17 + .../eagle/alert/config/ZKConfigBuilder.java | 38 ++ .../alert/coordination/model/AlertBoltSpec.java | 114 ++++ .../coordination/model/Kafka2TupleMetadata.java | 97 +++ .../coordination/model/PolicyWorkerQueue.java | 63 ++ .../alert/coordination/model/PublishSpec.java | 82 +++ .../alert/coordination/model/RouterSpec.java | 76 +++ .../alert/coordination/model/ScheduleState.java | 215 +++++++ .../alert/coordination/model/SpoutSpec.java | 124 ++++ .../coordination/model/StreamNameSelector.java | 15 + .../model/StreamRepartitionMetadata.java | 53 ++ .../model/StreamRepartitionStrategy.java | 82 +++ .../coordination/model/StreamRouterSpec.java | 75 +++ .../model/Tuple2StreamConverter.java | 65 ++ .../model/Tuple2StreamMetadata.java | 71 +++ .../model/VersionedPolicyDefinition.java | 53 ++ .../model/VersionedStreamDefinition.java | 53 ++ .../alert/coordination/model/WorkSlot.java | 55 ++ .../model/internal/MonitoredStream.java | 91 +++ .../model/internal/PolicyAssignment.java | 69 ++ .../model/internal/StreamGroup.java | 82 +++ .../model/internal/StreamWorkSlotQueue.java | 124 ++++ .../coordination/model/internal/Topology.java | 170 +++++ .../eagle/alert/engine/codec/IStreamCodec.java | 30 + .../alert/engine/codec/SherlockEventCodec.java | 41 ++ .../alert/engine/codec/SherlockMetricCodec.java | 39 ++ .../engine/coordinator/PolicyDefinition.java | 199 ++++++ .../alert/engine/coordinator/Publishment.java | 101 +++ .../engine/coordinator/PublishmentType.java | 83 +++ .../alert/engine/coordinator/StreamColumn.java | 128 ++++ .../engine/coordinator/StreamDefinition.java | 105 +++ .../engine/coordinator/StreamPartition.java | 143 +++++ .../engine/coordinator/StreamSortSpec.java | 98 +++ .../engine/coordinator/StreamingCluster.java | 89 +++ .../alert/engine/model/AlertStreamEvent.java | 89 +++ .../alert/engine/model/PartitionedEvent.java | 149 +++++ .../eagle/alert/engine/model/StreamEvent.java | 144 +++++ .../alert/engine/model/StreamEventBuilder.java | 72 +++ .../eagle/alert/metric/IMetricSystem.java | 64 ++ .../apache/eagle/alert/metric/MetricSystem.java | 116 ++++ .../eagle/alert/metric/entity/MetricEvent.java | 111 ++++ .../alert/metric/reporter/KafkaReporter.java | 197 ++++++ .../eagle/alert/metric/sink/ConsoleSink.java | 47 ++ .../alert/metric/sink/ElasticSearchSink.java | 74 +++ .../apache/eagle/alert/metric/sink/JmxSink.java | 48 ++ .../eagle/alert/metric/sink/KafkaSink.java | 65 ++ .../eagle/alert/metric/sink/MetricSink.java | 30 + .../alert/metric/sink/MetricSinkRepository.java | 47 ++ .../eagle/alert/metric/sink/Slf4jSink.java | 74 +++ .../alert/metric/source/JVMMetricSource.java | 41 ++ .../eagle/alert/metric/source/MetricSource.java | 24 + .../metric/source/MetricSourceWrapper.java | 39 ++ .../eagle/alert/resource/SimpleCORSFiler.java | 59 ++ .../alert/service/IMetadataServiceClient.java | 69 ++ .../service/MetadataServiceClientImpl.java | 235 +++++++ .../eagle/alert/utils/AlertConstants.java | 27 + .../org/apache/eagle/alert/utils/ByteUtils.java | 172 +++++ .../apache/eagle/alert/utils/ConfigUtils.java | 31 + .../apache/eagle/alert/utils/DateTimeUtil.java | 141 ++++ .../org/apache/eagle/alert/utils/HostUtils.java | 70 ++ .../org/apache/eagle/alert/utils/JsonUtils.java | 41 ++ .../eagle/alert/utils/StreamIdConversion.java | 36 ++ .../eagle/alert/utils/TimePeriodUtils.java | 62 ++ .../src/main/resources/log4j.properties | 21 + .../eagle/alert/config/TestConfigBus.java | 61 ++ .../eagle/alert/metric/MetricSystemTest.java | 78 +++ .../service/TestMetadataServiceClientImpl.java | 40 ++ .../apache/eagle/correlation/meta/JsonTest.java | 30 + .../src/test/resources/application.conf | 17 + .../src/test/resources/log4j.properties | 21 + .../alert/alert-coordinator/.gitignore | 1 + .../eagle-alert/alert/alert-coordinator/pom.xml | 96 +++ .../eagle/alert/coordinator/Coordinator.java | 212 +++++++ .../alert/coordinator/CoordinatorConstants.java | 31 + .../alert/coordinator/IPolicyScheduler.java | 34 + .../alert/coordinator/IScheduleContext.java | 54 ++ .../coordinator/PolicySchedulerFactory.java | 30 + .../eagle/alert/coordinator/ScheduleOption.java | 74 +++ .../alert/coordinator/TopologyMgmtService.java | 83 +++ .../coordinator/impl/GreedyPolicyScheduler.java | 340 ++++++++++ .../impl/MonitorMetadataGenerator.java | 287 +++++++++ .../alert/coordinator/impl/ScheduleResult.java | 42 ++ .../eagle/alert/coordinator/impl/WorkItem.java | 33 + .../coordinator/impl/WorkQueueBuilder.java | 93 +++ .../impl/strategies/IWorkSlotStrategy.java | 32 + .../strategies/SameTopologySlotStrategy.java | 164 +++++ .../alert/coordinator/model/AlertBoltUsage.java | 93 +++ .../alert/coordinator/model/GroupBoltUsage.java | 67 ++ .../alert/coordinator/model/TopologyUsage.java | 102 +++ .../provider/InMemScheduleConext.java | 148 +++++ .../provider/ScheduleContextBuilder.java | 400 ++++++++++++ .../resource/CoordinatorResource.java | 69 ++ .../trigger/DynamicPolicyLoader.java | 81 +++ .../trigger/PolicyChangeListener.java | 10 + .../src/main/resources/application.conf | 27 + .../src/main/resources/log4j.properties | 21 + .../src/main/webapp/WEB-INF/web.xml | 87 +++ .../src/main/webapp/index.html | 18 + .../alert/coordinator/CoordinatorTest.java | 124 ++++ .../coordinator/DynamicPolicyLoaderTest.java | 26 + .../MetadataServiceClientImplTest.java | 57 ++ .../coordinator/ScheduleContextBuilderTest.java | 281 ++++++++ .../apache/alert/coordinator/SchedulerTest.java | 635 +++++++++++++++++++ .../alert/coordinator/WorkSlotStrategyTest.java | 162 +++++ .../mock/InMemMetadataServiceClient.java | 147 +++++ .../mock/TestTopologyMgmtService.java | 73 +++ .../src/test/resources/test-application.conf | 27 + .../eagle-alert/alert/alert-devtools/.gitignore | 3 + .../alert/alert-devtools/bin/kafka-producer.sh | 21 + .../alert-devtools/bin/kafka-server-start.sh | 51 ++ .../alert-devtools/bin/kafka-server-status.sh | 24 + .../alert-devtools/bin/kafka-server-stop.sh | 23 + .../alert/alert-devtools/bin/kafka-topics.sh | 17 + .../alert/alert-devtools/bin/run-class.sh | 112 ++++ .../alert-devtools/bin/start-coordinator.sh | 20 + .../alert-devtools/bin/start-integration1.sh | 16 + .../alert-devtools/bin/start-integration2.sh | 19 + .../alert/alert-devtools/bin/start-metadata.sh | 20 + .../alert-devtools/bin/start-sampleclient1.sh | 6 + .../alert-devtools/bin/start-sampleclient2.sh | 6 + .../alert/alert-devtools/bin/start-zk-kafka.sh | 28 + .../alert/alert-devtools/bin/stop-zk-kafka.sh | 28 + .../bin/zookeeper-server-start.sh | 50 ++ .../bin/zookeeper-server-status.sh | 24 + .../alert-devtools/bin/zookeeper-server-stop.sh | 24 + .../alert-devtools/conf/cli-log4j.properties | 21 + .../alert-devtools/conf/kafka-server.properties | 115 ++++ .../alert/alert-devtools/conf/log4j.properties | 21 + .../conf/zookeeper-server.properties | 20 + .../eagle-alert/alert/alert-devtools/pom.xml | 96 +++ .../eagle/alert/tools/KafkaConsumerOffset.java | 27 + .../alert/tools/KafkaConsumerOffsetFetcher.java | 66 ++ .../alert/tools/KafkaLatestOffsetFetcher.java | 97 +++ .../eagle/contrib/kafka/ProducerTool.scala | 230 +++++++ .../eagle/alert/tools/TestKafkaOffset.java | 69 ++ .../kafka-offset-test.application.conf | 9 + .../src/test/resources/log4j.properties | 21 + .../eagle-alert/alert/alert-engine/.gitignore | 1 + .../alert-engine/alert-engine-base/pom.xml | 65 ++ .../alert/engine/AlertStreamCollector.java | 27 + .../apache/eagle/alert/engine/Collector.java | 27 + .../alert/engine/PartitionedEventCollector.java | 29 + .../eagle/alert/engine/StreamContext.java | 27 + .../eagle/alert/engine/StreamContextImpl.java | 26 + .../eagle/alert/engine/UnitTopologyMain.java | 59 ++ .../IMetadataChangeNotifyService.java | 57 ++ .../alert/engine/coordinator/MetadataType.java | 30 + .../StreamDefinitionNotFoundException.java | 25 + .../AbstractMetadataChangeNotifyService.java | 113 ++++ .../impl/ZKMetadataChangeNotifyService.java | 200 ++++++ .../engine/evaluator/PolicyChangeListener.java | 29 + .../engine/evaluator/PolicyGroupEvaluator.java | 45 ++ .../engine/evaluator/PolicyHandlerContext.java | 60 ++ .../engine/evaluator/PolicyStreamHandler.java | 27 + .../engine/evaluator/PolicyStreamHandlers.java | 33 + ...ertBoltOutputCollectorThreadSafeWrapper.java | 118 ++++ .../impl/AlertBoltOutputCollectorWrapper.java | 55 ++ .../impl/PolicyGroupEvaluatorImpl.java | 172 +++++ .../evaluator/impl/SiddhiDefinitionAdapter.java | 142 +++++ .../evaluator/impl/SiddhiPolicyHandler.java | 148 +++++ .../engine/publisher/AlertDeduplicator.java | 31 + .../engine/publisher/AlertPublishListener.java | 28 + .../engine/publisher/AlertPublishPlugin.java | 50 ++ .../publisher/AlertPublishSpecListener.java | 26 + .../alert/engine/publisher/AlertPublisher.java | 31 + .../eagle/alert/engine/publisher/AlertSink.java | 33 + .../engine/publisher/PublishConstants.java | 48 ++ .../publisher/email/AlertEmailConstants.java | 39 ++ .../publisher/email/AlertEmailContext.java | 71 +++ .../publisher/email/AlertEmailGenerator.java | 143 +++++ .../email/AlertEmailGeneratorBuilder.java | 59 ++ .../publisher/email/AlertEmailSender.java | 164 +++++ .../engine/publisher/email/EagleMailClient.java | 240 +++++++ .../impl/AlertEagleStorePersister.java | 60 ++ .../impl/AlertEagleStorePublisher.java | 113 ++++ .../publisher/impl/AlertEmailPublisher.java | 151 +++++ .../publisher/impl/AlertKafkaPublisher.java | 136 ++++ .../impl/AlertPublishPluginsFactory.java | 48 ++ .../publisher/impl/AlertPublisherImpl.java | 154 +++++ .../publisher/impl/DefaultDeduplicator.java | 113 ++++ .../alert/engine/publisher/impl/EventUniq.java | 57 ++ .../publisher/impl/KafkaProducerManager.java | 42 ++ .../engine/publisher/impl/PublishStatus.java | 27 + .../engine/router/AlertBoltSpecListener.java | 32 + .../alert/engine/router/SpoutSpecListener.java | 32 + .../eagle/alert/engine/router/StreamRoute.java | 85 +++ .../router/StreamRoutePartitionFactory.java | 37 ++ .../engine/router/StreamRoutePartitioner.java | 29 + .../engine/router/StreamRouteSpecListener.java | 30 + .../eagle/alert/engine/router/StreamRouter.java | 30 + .../router/StreamRouterBoltSpecListener.java | 33 + .../alert/engine/router/StreamSortHandler.java | 42 ++ .../engine/router/StreamSortSpecListener.java | 28 + .../impl/BasicStreamRoutePartitioner.java | 77 +++ .../router/impl/RoutePhysicalGrouping.java | 76 +++ .../engine/router/impl/ShuffleGrouping.java | 74 +++ .../impl/StreamRouterBoltOutputCollector.java | 229 +++++++ .../engine/router/impl/StreamRouterImpl.java | 163 +++++ .../alert/engine/runner/AbstractStreamBolt.java | 93 +++ .../eagle/alert/engine/runner/AlertBolt.java | 157 +++++ .../alert/engine/runner/AlertPublisherBolt.java | 113 ++++ .../alert/engine/runner/MapComparator.java | 72 +++ .../engine/runner/StormMetricConsumer.java | 136 ++++ .../runner/StormMetricTaggedConsumer.java | 156 +++++ .../alert/engine/runner/StreamRouterBolt.java | 201 ++++++ .../alert/engine/runner/UnitTopologyRunner.java | 215 +++++++ .../eagle/alert/engine/scheme/JsonScheme.java | 71 +++ .../scheme/JsonStringStreamNameSelector.java | 74 +++ .../alert/engine/scheme/PlainStringScheme.java | 67 ++ .../scheme/PlainStringStreamNameSelector.java | 49 ++ .../PartitionedEventDigestSerializer.java | 62 ++ .../PartitionedEventSerializer.java | 39 ++ .../SerializationMetadataProvider.java | 30 + .../alert/engine/serialization/Serializer.java | 26 + .../alert/engine/serialization/Serializers.java | 56 ++ .../serialization/impl/BooleanSerializer.java | 35 + .../serialization/impl/DoubleSerializer.java | 35 + .../serialization/impl/FloatSerializer.java | 35 + .../serialization/impl/IntegerSerializer.java | 35 + .../impl/JavaObjectSerializer.java | 42 ++ .../serialization/impl/LongSerializer.java | 35 + .../impl/PartitionedEventSerializerImpl.java | 98 +++ .../impl/StreamEventSerializer.java | 87 +++ .../impl/StreamPartitionDigestSerializer.java | 83 +++ .../impl/StreamPartitionSerializer.java | 64 ++ .../serialization/impl/StringSerializer.java | 35 + .../extension/AttributeCollectAggregator.java | 122 ++++ .../alert/engine/sorter/BaseStreamWindow.java | 183 ++++++ .../alert/engine/sorter/StreamTimeClock.java | 45 ++ .../engine/sorter/StreamTimeClockListener.java | 27 + .../engine/sorter/StreamTimeClockManager.java | 45 ++ .../engine/sorter/StreamTimeClockTrigger.java | 61 ++ .../eagle/alert/engine/sorter/StreamWindow.java | 113 ++++ .../engine/sorter/StreamWindowManager.java | 64 ++ .../engine/sorter/StreamWindowRepository.java | 257 ++++++++ .../sorter/impl/CachedEventGroupSerializer.java | 114 ++++ .../impl/PartitionedEventGroupSerializer.java | 112 ++++ .../PartitionedEventTimeOrderingComparator.java | 50 ++ .../impl/StreamSortWindowHandlerImpl.java | 113 ++++ .../sorter/impl/StreamSortedWindowInMapDB.java | 145 +++++ .../sorter/impl/StreamSortedWindowOnHeap.java | 80 +++ .../impl/StreamTimeClockInLocalMemory.java | 64 ++ .../sorter/impl/StreamTimeClockManagerImpl.java | 171 +++++ .../sorter/impl/StreamWindowManagerImpl.java | 173 +++++ .../alert/engine/spout/CorrelationSpout.java | 380 +++++++++++ .../alert/engine/spout/CreateTopicUtils.java | 43 ++ .../eagle/alert/engine/spout/ISpoutSpecLCM.java | 41 ++ .../engine/spout/KafkaMessageIdWrapper.java | 42 ++ .../eagle/alert/engine/spout/SchemeBuilder.java | 35 + .../spout/SpoutOutputCollectorWrapper.java | 220 +++++++ .../alert/engine/utils/CompressionUtils.java | 62 ++ .../alert/engine/utils/MetadataSerDeser.java | 93 +++ .../alert/engine/utils/SerializableUtils.java | 112 ++++ .../main/java/storm/kafka/KafkaSpoutMetric.java | 78 +++ .../java/storm/kafka/KafkaSpoutWrapper.java | 111 ++++ .../src/main/resources/ALERT_DEFAULT.vm | 267 ++++++++ .../src/main/resources/application.conf | 72 +++ .../src/main/resources/eagle.siddhiext | 19 + .../src/main/resources/log4j.properties | 26 + .../alert/engine/e2e/CoordinatorClient.java | 95 +++ .../eagle/alert/engine/e2e/Integration1.java | 236 +++++++ .../eagle/alert/engine/e2e/Integration2.java | 142 +++++ .../eagle/alert/engine/e2e/SampleClient1.java | 127 ++++ .../eagle/alert/engine/e2e/SampleClient2.java | 139 ++++ ...oltOutputCollectorThreadSafeWrapperTest.java | 74 +++ .../SiddhiCEPPolicyEventHandlerTest.java | 161 +++++ .../integration/MockMetadataServiceClient.java | 124 ++++ .../engine/metric/MemoryUsageGaugeSetTest.java | 47 ++ .../engine/mock/MockPartitionedCollector.java | 55 ++ .../engine/mock/MockSampleMetadataFactory.java | 266 ++++++++ .../alert/engine/mock/MockStreamCollector.java | 51 ++ .../engine/mock/MockStreamMetadataService.java | 39 ++ .../alert/engine/mock/MockStreamReceiver.java | 79 +++ .../alert/engine/perf/TestSerDeserPer.java | 301 +++++++++ .../alert/engine/router/TestAlertBolt.java | 177 ++++++ .../engine/router/TestAlertPublisherBolt.java | 111 ++++ .../engine/runner/TestStreamRouterBolt.java | 259 ++++++++ .../serialization/JavaSerializationTest.java | 112 ++++ .../PartitionedEventSerializerTest.java | 220 +++++++ .../AttributeCollectAggregatorTest.java | 149 +++++ .../alert/engine/sorter/MapDBTestSuite.java | 45 ++ .../engine/sorter/StreamSortHandlerTest.java | 270 ++++++++ .../sorter/StreamWindowBenchmarkTest.java | 145 +++++ .../engine/sorter/StreamWindowTestSuite.java | 164 +++++ .../sorter/TreeMultisetComparatorTest.java | 94 +++ .../engine/topology/AlertTopologyTest.java | 138 ++++ .../CoordinatorSpoutIntegrationTest.java | 101 +++ .../engine/topology/CorrelationSpoutTest.java | 184 ++++++ .../engine/topology/FastWordCountTopology.java | 215 +++++++ .../MockMetadataChangeNotifyService.java | 111 ++++ .../engine/topology/SendData2KafkaTest.java | 103 +++ .../eagle/alert/engine/topology/TestBolt.java | 61 ++ .../alert/engine/topology/TestByteBuffer.java | 53 ++ .../topology/TestMetadataSpecSerDeser.java | 266 ++++++++ .../TestStormCustomGroupingRouting.java | 144 +++++ .../engine/topology/TestStormParallelism.java | 161 +++++ .../topology/TestStormStreamIdRouting.java | 140 ++++ .../topology/TestTuple2StreamConverter.java | 62 ++ .../engine/topology/TestUnitTopologyMain.java | 57 ++ .../engine/utils/CompressionUtilsTest.java | 43 ++ .../alert/engine/utils/TimePeriodUtilsTest.java | 91 +++ .../test/resources/application-integration.conf | 57 ++ .../test/resources/application-test-backup.conf | 71 +++ .../src/test/resources/application-test.conf | 68 ++ .../correlation/application-integration-2.conf | 57 ++ .../test/resources/correlation/datasources.json | 37 ++ .../test/resources/correlation/policies.json | 39 ++ .../resources/correlation/publishments.json | 17 + .../correlation/streamdefinitions.json | 93 +++ .../test/resources/correlation/topologies.json | 31 + .../src/test/resources/datasources.json | 19 + .../src/test/resources/log4j.properties | 23 + .../src/test/resources/policies.json | 54 ++ .../src/test/resources/publishments.json | 29 + .../src/test/resources/publishments2.json | 19 + .../src/test/resources/sample_perfmon_data.json | 3 + .../src/test/resources/streamdefinitions.json | 44 ++ .../src/test/resources/testAlertBoltSpec.json | 92 +++ .../src/test/resources/testPublishSpec.json | 32 + .../src/test/resources/testPublishSpec2.json | 32 + .../src/test/resources/testSpoutSpec.json | 139 ++++ .../resources/testStreamDefinitionsSpec.json | 47 ++ .../resources/testStreamRouterBoltSpec.json | 123 ++++ .../src/test/resources/topic.json | 1 + .../src/test/resources/topologies.json | 31 + .../eagle-alert/alert/alert-engine/pom.xml | 110 ++++ .../alert-metadata-service/.gitignore | 2 + .../alert-metadata-service/pom.xml | 113 ++++ .../metadata/resource/MetadataResource.java | 225 +++++++ .../topology/resource/TopologyMgmtResource.java | 69 ++ .../impl/TopologyMgmtResourceHelper.java | 48 ++ .../resource/impl/TopologyMgmtResourceImpl.java | 158 +++++ .../topology/resource/impl/TopologyStatus.java | 82 +++ .../src/main/resources/application.conf | 6 + .../src/main/webapp/WEB-INF/web.xml | 81 +++ .../src/main/webapp/index.html | 18 + .../impl/TopologyMgmtResourceImplTest.java | 55 ++ .../alert-metadata/.gitignore | 2 + .../alert-metadata/pom.xml | 52 ++ .../metadata/impl/InMemMetadataDaoImpl.java | 333 ++++++++++ .../alert/metadata/impl/MetadataDaoFactory.java | 70 ++ .../metadata/impl/MongoMetadataDaoImpl.java | 380 +++++++++++ .../metadata/impl/jdbc/JdbcMetadataDaoImpl.java | 292 +++++++++ .../alert/metadata/resource/IMetadataDao.java | 86 +++ .../eagle/alert/metadata/resource/Models.java | 48 ++ .../eagle/alert/metadata/resource/OpResult.java | 28 + .../src/main/resources/application.conf | 8 + .../alert/resource/impl/InMemoryTest.java | 43 ++ .../alert/resource/impl/MongoImplTest.java | 187 ++++++ .../src/test/resources/application-mongo.conf | 6 + .../src/test/resources/policy-sample.json | 32 + .../alert/alert-metadata-parent/pom.xml | 29 + eagle-core/eagle-alert/alert/pom.xml | 387 +++++++++++ 364 files changed, 31608 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/.gitignore ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/.gitignore b/eagle-core/eagle-alert/alert/.gitignore new file mode 100644 index 0000000..12af842 --- /dev/null +++ b/eagle-core/eagle-alert/alert/.gitignore @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Eagle - Git Ignore Configuration +# +# See: https://github.com/github/gitignore/ + +*.class +*.out + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear + +.cache-main + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# Maven +target/ +target/* +*/target/* +**/target/* +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +eagle-security/eagle-security-mltraining/src/test/resources/outputDir/ +eagle-security/eagle-security-userprofile/src/test/resources/models/ + +# IntelliJ IDEA +.idea/ +.metadata/ +**.iml +**.eml +**.patch +mobile.userlibraries + +# Eclipse +build +.metadata +.classpath +.project +.settings +.externalToolBuilders +classes +# bin + +# Temporary files +logs/ +*.log* +# Mac files +.DS_Store + +*.cache-tests +application-local.conf + +*.orig +**/*.pyc +*.bak http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/README.md ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/README.md b/eagle-core/eagle-alert/alert/README.md new file mode 100644 index 0000000..54cd226 --- /dev/null +++ b/eagle-core/eagle-alert/alert/README.md @@ -0,0 +1,151 @@ + +## Prerequisites + +* [Apache Maven](https://maven.apache.org/) +* [Java 8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) + +## Documentation + + +## Build + + mvn install + +## Setup +The alert engine have three dependency module: Coordinator Service, Metadata Service, and engine runtime(storm topologies). + +####0. Dependencies +> Alert engine need kafka as data source, ZK as coordination. Check alert-devtools/bin to start zk and kafka through start-zk-kafka.sh. + +####1. Start metadata service +> For local dev, project alert-metadata-service packaging as a war, and enabled mvn jetty:run to run it. By default, metadata runs on localhost:8080 + +> For deployment, after mvn install, a war is avaialble in alert-metadata-service/target + +####2. Start coordiantor service +> For local dev, project alert-coordinator packaing as a war, and enabled mvn jetty:run to run it. By default, it runs in localhost:9090, and have dependency on metadata. See application.conf for coordinator. + +> For deployment, find war in alert-coordinator/target after mvn install + +####3. Start engine runtime. +> The engine are the topologies that runs in any storm (local or remote) with configuration to connect to the ZK and metadata service. The alert engine runtime main as in UnitTopologyMain.java. The started storm bolt should have the same name described in alert-metadata. Example of the configuration is /alert-engine-base/src/main/resources/application.conf + +See below detailed steps. + + +## Run +* pre-requisites + * zookeeper + * storm + * kafka + * tomcat + * mongdb + +* Run Metadata service + 1. copy alert-metadata/target/alert-metadata-0.0.1-SNAPSHOT.war into tomcat webapps/alertmetadata.war + 2. check config under webapps/alertmetadata/WEB-INF/classes/application.conf + ```json + { + "datastore": { + "metadataDao": "org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl", + "connection": "localhost:27017" + } + } + ``` + + 3. start tomcat + +* Run Coordinator service + 1. copy alert-coordinator/target/alert-coordinator-0.0.1-SNAPSHOT.war to tomcat webappes/coordinator.war + 2. check config under webapps/coordinator/WEB-INF/classes/application.conf + ```json + { + "coordinator" : { + "policiesPerBolt" : 5, + "boltParallelism" : 5, + "policyDefaultParallelism" : 5, + "boltLoadUpbound": 0.8, + "topologyLoadUpbound" : 0.8, + "numOfAlertBoltsPerTopology" : 5, + "zkConfig" : { + "zkQuorum" : "localhost:2181", + "zkRoot" : "/alert", + "zkSessionTimeoutMs" : 10000, + "connectionTimeoutMs" : 10000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 3000 + }, + "metadataService" : { + "host" : "localhost", + "port" : 8080, + "context" : "/alertmetadata/api" + }, + "metadataDynamicCheck" : { + "initDelayMillis" : 1000, + "delayMillis" : 30000 + } + } + } + ``` + 3. start tomcat + +* Run UnitTopologyMain + 1. copy alert-assembly/target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar to somewhere close to your storm installation + 2. check config application.conf + ```json + { + "topology" : { + "name" : "alertUnitTopology_1", + "numOfTotalWorkers" : 2, + "numOfSpoutTasks" : 1, + "numOfRouterBolts" : 4, + "numOfAlertBolts" : 10, + "numOfPublishTasks" : 1, + "messageTimeoutSecs": 3600, + "localMode" : "true" + }, + "spout" : { + "kafkaBrokerZkQuorum": "localhost:2181", + "kafkaBrokerZkBasePath": "/brokers", + "stormKafkaUseSameZkQuorumWithKafkaBroker": true, + "stormKafkaTransactionZkQuorum": "", + "stormKafkaTransactionZkPath": "/consumers", + "stormKafkaEagleConsumer": "eagle_consumer", + "stormKafkaStateUpdateIntervalMs": 2000, + "stormKafkaFetchSizeBytes": 1048586, + }, + "zkConfig" : { + "zkQuorum" : "localhost:2181", + "zkRoot" : "/alert", + "zkSessionTimeoutMs" : 10000, + "connectionTimeoutMs" : 10000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 3000 + }, + "dynamicConfigSource" : { + "initDelayMillis": 3000, + "delayMillis" : 10000 + }, + "metadataService": { + "context" : "/alertmetadata/api", + "host" : "localhost", + "port" : 8080 + }, + "coordinatorService": { + "host": "localhost", + "port": 8080, + "context" : "/coordinator/api" + } + "metric": { + "sink": { + "stdout": {} + } + } +} +``` + Note: please make sure the above configuration is used by storm instead of the configuration within fat jar + 3. start storm + storm jar alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.UnitTopologyMain + +## Support + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-assembly/pom.xml b/eagle-core/eagle-alert/alert/alert-assembly/pom.xml new file mode 100644 index 0000000..b48ee5b --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-assembly/pom.xml @@ -0,0 +1,86 @@ + + + 4.0.0 + + + org.apache.eagle + alert-parent + 0.0.1-SNAPSHOT + + + alert-assembly + jar + + + + org.apache.eagle + alert-common + ${project.version} + + + org.apache.eagle + alert-coordinator + ${project.version} + war + + + org.apache.eagle + alert-metadata-service + ${project.version} + war + + + org.apache.eagle + alert-engine-base + ${project.version} + + + + + org.apache.eagle + alert-common + ${project.version} + test-jar + + + org.apache.eagle + alert-coordinator + ${project.version} + test-jar + + + org.apache.eagle + alert-engine-base + ${project.version} + test-jar + + + + + + + src/resources + + + + + maven-assembly-plugin + + src/assembly/alert-assembly.xml + alert-engine-${project.version} + + + + package + + single + + + posix + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml b/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml new file mode 100644 index 0000000..556b2ad --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml @@ -0,0 +1,41 @@ + + alert-assembly + + jar + + false + + + / + false + true + runtime + + + + + org.apache.storm:storm-core + org.slf4j:slf4j-api + org.slf4j:log4j-over-slf4j + org.slf4j:slf4j-log4j12 + log4j:log4j + asm:asm + org.apache.log4j.wso2:log4j + log4j:apache-log4j-extras + + + + + + ${project.build.outputDirectory} + / + + + + + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/.gitignore ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/.gitignore b/eagle-core/eagle-alert/alert/alert-common/.gitignore new file mode 100644 index 0000000..1dd3331 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/.gitignore @@ -0,0 +1,2 @@ +/target/ +/target/ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/pom.xml b/eagle-core/eagle-alert/alert/alert-common/pom.xml new file mode 100644 index 0000000..129cff0 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/pom.xml @@ -0,0 +1,129 @@ + + + + 4.0.0 + + + org.apache.eagle + alert-parent + 0.0.1-SNAPSHOT + + + alert-common + jar + + + + com.typesafe + config + + + com.fasterxml.jackson.core + jackson-annotations + + + org.apache.commons + commons-lang3 + + + commons-collections + commons-collections + + + commons-lang + commons-lang + + + junit + junit + test + + + com.google.guava + guava + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-recipes + + + com.fasterxml.jackson.core + jackson-databind + + + com.sun.jersey + jersey-client + + + org.codehaus.jackson + jackson-jaxrs + + + joda-time + joda-time + + + io.dropwizard.metrics + metrics-core + + + org.apache.storm + storm-core + + + org.elasticsearch + metrics-elasticsearch-reporter + + + org.apache.kafka + kafka-clients + + + org.apache.kafka + ${kafka.artifact.id} + + + io.dropwizard.metrics + metrics-jvm + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.6 + + + + test-jar + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java new file mode 100644 index 0000000..7644d89 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java @@ -0,0 +1,40 @@ +package org.apache.eagle.alert.config; + +import java.io.Closeable; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; + + +/** + * Abstraction of asynchronized configuration management + * This is used for config change notification between processes, without this one process has to pull changes triggered by another process + * + * Config bus is similar to message bus, config change producer can publish config change(message) to config bus, + * while config change consumer can subscribe config change and do business logic in callback + * 1. use zookeeper as media to notify config consumer of config changes + * 2. each type of config is represented by topic + * 3. each config change can contain actual value or contain reference Id which consumer uses to retrieve actual value. This mechanism will reduce zookeeper overhed + * + */ +public class ConfigBusBase implements Closeable{ + protected String zkRoot; + protected CuratorFramework curator; + + public ConfigBusBase(ZKConfig config) { + this.zkRoot = config.zkRoot; + curator = CuratorFrameworkFactory.newClient( + config.zkQuorum, + config.zkSessionTimeoutMs, + config.connectionTimeoutMs, + new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval) + ); + curator.start(); + } + + @Override + public void close(){ + curator.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java new file mode 100644 index 0000000..11eea1f --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java @@ -0,0 +1,40 @@ +package org.apache.eagle.alert.config; + +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.slf4j.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * 1. When consumer is started, it always get notified of config + * 2. When config is changed, consumer always get notified of config change + * + * Reliability issue: + * TODO How to ensure config change message is always delivered to consumer + */ +public class ConfigBusConsumer extends ConfigBusBase { + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class); + + private NodeCache cache; + public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback){ + super(config); + String zkPath = zkRoot + "/" + topic; + LOG.info("monitor change for zkPath " + zkPath); + cache = new NodeCache(curator, zkPath); + cache.getListenable().addListener( () -> + { + // get node value and notify callback + byte[] value = curator.getData().forPath(zkPath); + ObjectMapper mapper = new ObjectMapper(); + ConfigValue v = mapper.readValue(value, ConfigValue.class); + callback.onNewConfig(v); + } + ); + try { + cache.start(); + }catch(Exception ex){ + LOG.error("error start NodeCache listener", ex); + throw new RuntimeException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java new file mode 100644 index 0000000..6dab51a --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java @@ -0,0 +1,37 @@ +package org.apache.eagle.alert.config; + +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class ConfigBusProducer extends ConfigBusBase { + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusProducer.class); + + public ConfigBusProducer(ZKConfig config){ + super(config); + } + + /** + * @param topic + * @param config + */ + public void send(String topic, ConfigValue config){ + // check if topic exists, create this topic if not existing + String zkPath = zkRoot + "/" + topic; + try { + if (curator.checkExists().forPath(zkPath) == null) { + curator.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(zkPath); + } + ObjectMapper mapper = new ObjectMapper(); + byte[] content = mapper.writeValueAsBytes(config); + curator.setData().forPath(zkPath, content); + }catch(Exception ex){ + LOG.error("error creating zkPath " + zkPath, ex); + throw new RuntimeException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java new file mode 100644 index 0000000..60df605 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java @@ -0,0 +1,5 @@ +package org.apache.eagle.alert.config; + +public interface ConfigChangeCallback { + void onNewConfig(ConfigValue value); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java new file mode 100644 index 0000000..f59c2b9 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java @@ -0,0 +1,33 @@ +package org.apache.eagle.alert.config; + + +/** + * Config body contains actual data for one topic + * this is serialized with json format into zookeeper + * value can be versionId which is used for referencing outside data + * or value can be actual config value + */ +public class ConfigValue { + private boolean isValueVersionId; + private Object value; + + public boolean isValueVersionId() { + return isValueVersionId; + } + + public void setValueVersionId(boolean valueVersionId) { + isValueVersionId = valueVersionId; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + public String toString(){ + return "isValueVersionId: " + isValueVersionId + ", value: " + value; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java new file mode 100644 index 0000000..fe04b4b --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java @@ -0,0 +1,17 @@ +package org.apache.eagle.alert.config; + +import java.io.Serializable; + +/** + * Memory representation of key zookeeper configurations + */ +public class ZKConfig implements Serializable{ + private static final long serialVersionUID = -1287231022807492775L; + + public String zkQuorum; + public String zkRoot; + public int zkSessionTimeoutMs; + public int connectionTimeoutMs; + public int zkRetryTimes; + public int zkRetryInterval; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java new file mode 100644 index 0000000..9d77a58 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java @@ -0,0 +1,38 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.eagle.alert.config; + +import com.typesafe.config.Config; + +/** + * Since 4/28/16. + */ +public class ZKConfigBuilder { + public static ZKConfig getZKConfig(Config config){ + ZKConfig zkConfig = new ZKConfig(); + zkConfig.zkQuorum = config.getString("zkConfig.zkQuorum"); + zkConfig.zkRoot = config.getString("zkConfig.zkRoot"); + zkConfig.zkSessionTimeoutMs = config.getInt("zkConfig.zkSessionTimeoutMs"); + zkConfig.connectionTimeoutMs = config.getInt("zkConfig.connectionTimeoutMs"); + zkConfig.zkRetryTimes = config.getInt("zkConfig.zkRetryTimes"); + zkConfig.zkRetryInterval = config.getInt("zkConfig.zkRetryInterval"); + return zkConfig; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java new file mode 100644 index 0000000..83d307c --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +/** + * The alert specification for topology bolts. + * + * @since Apr 29, 2016 + */ +public class AlertBoltSpec { + private String version; + private String topologyName; + + // mapping from boltId to list of PolicyDefinitions + @JsonIgnore + private Map> boltPoliciesMap = new HashMap>(); + + // mapping from boltId to list of PolicyDefinition's Ids + private Map> boltPolicyIdsMap = new HashMap>(); + + public AlertBoltSpec() { + } + + public AlertBoltSpec(String topo) { + this.topologyName = topo; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getTopologyName() { + return topologyName; + } + + public void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } + +// public List getBoltPolicy(String boltId) { +// return boltPoliciesMap.get(boltId); +// } +// +// public void addBoltPolicy(String boltId, PolicyDefinition pd) { +// if (boltPoliciesMap.containsKey(boltId)) { +// boltPoliciesMap.get(boltId).add(pd); +// } else { +// List list = new ArrayList(); +// boltPoliciesMap.put(boltId, list); +// list.add(pd); +// } +// } + + public void addBoltPolicy(String boltId, String policyName) { + if (boltPolicyIdsMap.containsKey(boltId)) { + boltPolicyIdsMap.get(boltId).add(policyName); + } else { + List list = new ArrayList(); + boltPolicyIdsMap.put(boltId, list); + list.add(policyName); + } + } + + @JsonIgnore + public Map> getBoltPoliciesMap() { + return boltPoliciesMap; + } + + @JsonIgnore + public void setBoltPoliciesMap(Map> boltPoliciesMap) { + this.boltPoliciesMap = boltPoliciesMap; + } + + public Map> getBoltPolicyIdsMap() { + return boltPolicyIdsMap; + } + + public void setBoltPolicyIdsMap(Map> boltPolicyIdsMap) { + this.boltPolicyIdsMap = boltPolicyIdsMap; + } + + @Override + public String toString() { + return String.format("version:%s-topo:%s, boltPolicyIdsMap %s", version, topologyName, boltPolicyIdsMap); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java new file mode 100644 index 0000000..6c4f576 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.Map; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import com.google.common.base.Objects; + +/** + * @since Apr 5, 2016 + * this metadata model controls how to convert kafka topic into tuple stream + */ +public class Kafka2TupleMetadata { + private String type; + private String name; // data source name + private Map properties; + private String topic; + private String schemeCls; + + private Tuple2StreamMetadata codec; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public void setSchemeCls(String schemeCls) { + this.schemeCls = schemeCls; + } + + public void setName(String name) { + this.name = name; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public Tuple2StreamMetadata getCodec() { + return codec; + } + + public void setCodec(Tuple2StreamMetadata codec) { + this.codec = codec; + } + + public String getTopic() { + return this.topic; + } + public String getSchemeCls() { + return this.schemeCls; + } + + public int hashCode() { + return new HashCodeBuilder().append(name).append(type).build(); + } + + public boolean equals(Object obj) { + if (!(obj instanceof Kafka2TupleMetadata)) { + return false; + } + Kafka2TupleMetadata o = (Kafka2TupleMetadata) obj; + return Objects.equal(name, o.name) && Objects.equal(type, o.type); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java new file mode 100644 index 0000000..e5e5618 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java @@ -0,0 +1,63 @@ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +public class PolicyWorkerQueue { + + private StreamPartition partition; + private List workers; + + public PolicyWorkerQueue() { + workers = new ArrayList<>(); + } + + public PolicyWorkerQueue(List workers) { + this.workers = workers; + } + + public PolicyWorkerQueue(StreamPartition partition, List workers) { + this.workers = workers; + this.partition = partition; + } + + public StreamPartition getPartition() { + return partition; + } + + public void setPartition(StreamPartition partition) { + this.partition = partition; + } + + public List getWorkers() { + return workers; + } + + public void setWorkers(List workers) { + this.workers = workers; + } + + public String toString() { + return "[" + StringUtils.join(workers, ",") + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java new file mode 100644 index 0000000..06e819a --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.eagle.alert.engine.coordinator.Publishment; + +/** + * + * @since May 1, 2016 + * + */ +public class PublishSpec { + + private String topologyName; + // actually only publish spec for one topology + private String boltId; + private String version; + + private List publishments = new ArrayList(); + + public PublishSpec() { + } + + public PublishSpec(String topoName, String boltId) { + this.topologyName = topoName; + this.boltId = boltId; + } + + public void addPublishment(Publishment p) { + this.publishments.add(p); + } + + public String getTopologyName() { + return topologyName; + } + + public String getBoltId() { + return boltId; + } + + public List getPublishments() { + return publishments; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } + + public void setBoltId(String boltId) { + this.boltId = boltId; + } + + public void setPublishments(List publishments) { + this.publishments = publishments; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java new file mode 100644 index 0000000..5241920 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.List; + +import org.codehaus.jackson.annotate.JsonIgnore; + +/** + * @since Apr 29, 2016 + * + */ +public class RouterSpec { + private String version; + private String topologyName; + + private List routerSpecs; + + public RouterSpec() { + routerSpecs = new ArrayList(); + } + + public RouterSpec(String topoName) { + this(); + this.topologyName = topoName; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getTopologyName() { + return topologyName; + } + + public void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } + + @JsonIgnore + public void addRouterSpec(StreamRouterSpec routerSpec) { + routerSpecs.add(routerSpec); + } + + public List getRouterSpecs() { + return routerSpecs; + } + + public void setRouterSpecs(List routerSpecs) { + this.routerSpecs = routerSpecs; + } + + @Override + public String toString() { + return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java new file mode 100644 index 0000000..6036f29 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +/** + * A global wise of schedule status
+ *
+ * TODO/FIXME: The persistence simply deserial ScheduleState to Json. One + * concern is that this string might become too big for store.
+ *
+ * The solution is in metadata resource, have specs/monitoredStreams/policy + * assignments stored in different table/collections with tage version. + * + * + * @since Apr 26, 2016 + * + */ +public class ScheduleState { + + // ScheduleSpec + private Map spoutSpecs = new HashMap(); + private Map alertSpecs = new HashMap(); + private Map groupSpecs = new HashMap(); + private Map publishSpecs = new HashMap(); + + // ScheduleSnapshot + private List policySnapshots = new ArrayList(); + private List streamSnapshots = new ArrayList(); + + // ScheduleResult + private List monitoredStreams = new ArrayList(); + private List assignments = new ArrayList(); + + private String version; + // FIXME : should be date, can not make it simple in mongo.. + private String generateTime; + private int code = 200; + private String message = "OK"; + + public ScheduleState() { + } + + public ScheduleState(String version, + Map topoSpoutSpecsMap, + Map groupSpecsMap, + Map alertSpecsMap, + Map pubMap, + Collection assignments, + Collection monitoredStreams, + Collection definitions, + Collection streams) { + this.spoutSpecs = topoSpoutSpecsMap; + this.groupSpecs = groupSpecsMap; + this.alertSpecs = alertSpecsMap; + this.publishSpecs = pubMap; + this.version = version; + this.generateTime = String.valueOf(new Date().getTime()); + this.assignments = new ArrayList(assignments); + this.monitoredStreams = new ArrayList(monitoredStreams); + this.policySnapshots = new ArrayList(); + this.streamSnapshots = new ArrayList(); + + for (SpoutSpec ss : this.spoutSpecs.values()) { + ss.setVersion(version); + } + + for (RouterSpec ss : this.groupSpecs.values()) { + ss.setVersion(version); + } + + for (AlertBoltSpec ss : this.alertSpecs.values()) { + ss.setVersion(version); + } + + for (PublishSpec ps : this.publishSpecs.values()) { + ps.setVersion(version); + } + + for (MonitoredStream ms : this.monitoredStreams) { + ms.setVersion(version); + } + for (PolicyAssignment ps : this.assignments) { + ps.setVersion(version); + } + for (PolicyDefinition def : definitions) { + this.policySnapshots.add(new VersionedPolicyDefinition(version, def)); + } + for (StreamDefinition sd :streams) { + this.streamSnapshots.add(new VersionedStreamDefinition(version, sd)); + } + } + + public Map getSpoutSpecs() { + return spoutSpecs; + } + + public void setSpoutSpecs(Map spoutSpecs) { + this.spoutSpecs = spoutSpecs; + } + + public Map getAlertSpecs() { + return alertSpecs; + } + + public void setAlertSpecs(Map alertSpecs) { + this.alertSpecs = alertSpecs; + } + + public Map getGroupSpecs() { + return groupSpecs; + } + + public void setGroupSpecs(Map groupSpecs) { + this.groupSpecs = groupSpecs; + } + + public Map getPublishSpecs() { + return publishSpecs; + } + + public void setPublishSpecs(Map publishSpecs) { + this.publishSpecs = publishSpecs; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getGenerateTime() { + return generateTime; + } + + public void setGenerateTime(String generateTime) { + this.generateTime = generateTime; + } + + public List getMonitoredStreams() { + return monitoredStreams; + } + + public List getAssignments() { + return assignments; + } + + public List getPolicySnapshots() { + return policySnapshots; + } + + public void setPolicySnapshots(List policySnapshots) { + this.policySnapshots = policySnapshots; + } + + public void setMonitoredStreams(List monitoredStreams) { + this.monitoredStreams = monitoredStreams; + } + + public void setAssignments(List assignments) { + this.assignments = assignments; + } + + public List getStreamSnapshots() { + return streamSnapshots; + } + + public void setStreamSnapshots(List streamSnapshots) { + this.streamSnapshots = streamSnapshots; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java new file mode 100644 index 0000000..a197858 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + + +/** + * SpoutSpec metadata control 3 phases for data transformation for one specific topic + * phase 1: kafka topic to tuple, controlled by Kafka2TupleMetadata, i.e. Scheme + * phase 2: tuple to stream, controlled by Tuple2StreamMetadata, i.e. stream name selector etc. + * phase 3: stream repartition, controlled by StreamRepartitionMetadata, i.e. groupby spec + * @since Apr 18, 2016 + * + */ +public class SpoutSpec { + private String version; + +// private String spoutId; + private String topologyId; + + // topicName -> kafka2TupleMetadata + private Map kafka2TupleMetadataMap = new HashMap(); + // topicName -> Tuple2StreamMetadata + private Map tuple2StreamMetadataMap = new HashMap(); + // topicName -> list of StreamRepartitionMetadata, here it is list because one topic(data source) may spawn multiple streams. + private Map> streamRepartitionMetadataMap = new HashMap>(); + + public SpoutSpec(){} + + public SpoutSpec( + String topologyId, +// String spoutId, + Map> streamRepartitionMetadataMap, + Map tuple2StreamMetadataMap, + Map kafka2TupleMetadataMap) { + this.topologyId = topologyId; +// this.spoutId = spoutId; + this.streamRepartitionMetadataMap = streamRepartitionMetadataMap; + this.tuple2StreamMetadataMap = tuple2StreamMetadataMap; + this.kafka2TupleMetadataMap = kafka2TupleMetadataMap; + } + +// public String getSpoutId() { +// return spoutId; +// } +// public void setSpoutId(String spoutId) { +// this.spoutId = spoutId; +// } + + public String getTopologyId() { + return topologyId; + } + + public Map> getStreamRepartitionMetadataMap() { + return streamRepartitionMetadataMap; + } + + public Map getTuple2StreamMetadataMap(){ + return this.tuple2StreamMetadataMap; + } + + public Map getKafka2TupleMetadataMap() { + return kafka2TupleMetadataMap; + } + + @org.codehaus.jackson.annotate.JsonIgnore + public StreamRepartitionMetadata getStream(String streamName) { + for (List meta : this.streamRepartitionMetadataMap.values()) { + Optional m = meta.stream().filter((t) -> t.getStreamId().equalsIgnoreCase(streamName)).findFirst(); + if (m.isPresent()) { + return m.get(); + } + } + return null; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; + } + + public void setKafka2TupleMetadataMap(Map kafka2TupleMetadataMap) { + this.kafka2TupleMetadataMap = kafka2TupleMetadataMap; + } + + public void setTuple2StreamMetadataMap(Map tuple2StreamMetadataMap) { + this.tuple2StreamMetadataMap = tuple2StreamMetadataMap; + } + + public void setStreamRepartitionMetadataMap(Map> streamRepartitionMetadataMap) { + this.streamRepartitionMetadataMap = streamRepartitionMetadataMap; + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + @Override + public String toString() { + return String.format("version:%s-topo:%s ", version, this.topologyId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java new file mode 100644 index 0000000..d421716 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java @@ -0,0 +1,15 @@ +package org.apache.eagle.alert.coordination.model; + +import java.util.Map; + +/** + * This metadata controls how to figure out stream name from incoming tuple + */ +public interface StreamNameSelector { + /** + * field name to value mapping + * @param tuple + * @return + */ + String getStreamName(Map tuple); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java new file mode 100644 index 0000000..5f8f689 --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java @@ -0,0 +1,53 @@ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.List; + +import org.codehaus.jackson.annotate.JsonIgnore; + +/** + * @since Apr 25, 2016 + * This meta-data controls how tuple streamId is repartitioned + */ +public class StreamRepartitionMetadata { + private String topicName; + private String streamId; + /** + * each stream may have multiple different grouping strategies,for example groupby some fields or even shuffling + */ + public List groupingStrategies = new ArrayList(); + + public StreamRepartitionMetadata(){} + + public StreamRepartitionMetadata(String topicName, String stream) { + this.topicName = topicName; + this.streamId = stream; + } + + public String getStreamId() { + return streamId; + } + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + public String getTopicName() { + return topicName; + } + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public List getGroupingStrategies() { + return groupingStrategies; + } + + @JsonIgnore + public void addGroupStrategy(StreamRepartitionStrategy gs) { + this.groupingStrategies.add(gs); + } + + public void setGroupingStrategies(List groupingStrategies) { + this.groupingStrategies = groupingStrategies; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java new file mode 100644 index 0000000..203114e --- /dev/null +++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordination.model; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; + +public class StreamRepartitionStrategy { + public StreamPartition partition ; + + public int numTotalParticipatingRouterBolts = 0; // how many group-by bolts participate policy evaluation + public int startSequence = 0; // what is the sequence for the first bolt in this topology among all bolts + public List totalTargetBoltIds = new ArrayList(); + + public int hashCode() { + int hashcode = 1 * 31; + hashcode += partition.hashCode(); + for (String str : totalTargetBoltIds) { + hashcode += str.hashCode(); + } + return hashcode; + } + + public boolean equals(Object obj) { + if (!(obj instanceof StreamRepartitionStrategy)) { + return false; + } + StreamRepartitionStrategy o = (StreamRepartitionStrategy) obj; + return partition.equals(o.partition) + && CollectionUtils.isEqualCollection(totalTargetBoltIds, o.totalTargetBoltIds); + } + + public StreamPartition getPartition() { + return partition; + } + + public void setPartition(StreamPartition partition) { + this.partition = partition; + } + + public int getNumTotalParticipatingRouterBolts() { + return numTotalParticipatingRouterBolts; + } + + public void setNumTotalParticipatingRouterBolts(int numTotalParticipatingRouterBolts) { + this.numTotalParticipatingRouterBolts = numTotalParticipatingRouterBolts; + } + + public int getStartSequence() { + return startSequence; + } + + public void setStartSequence(int startSequence) { + this.startSequence = startSequence; + } + + public List getTotalTargetBoltIds() { + return totalTargetBoltIds; + } + + public void setTotalTargetBoltIds(List totalTargetBoltIds) { + this.totalTargetBoltIds = totalTargetBoltIds; + } + +} \ No newline at end of file