eagle-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ralp...@apache.org
Subject [19/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5
Date Wed, 01 Jun 2016 05:56:34 GMT
EAGLE-324: Init branch-v0.5

Author: ralphsu


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/75a8265c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/75a8265c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/75a8265c

Branch: refs/heads/branch-0.5
Commit: 75a8265c6ea2b26fd542de7e103a7235feb935c0
Parents: fb00fa3
Author: Ralph, Su <suliangfei@gmail.com>
Authored: Wed Jun 1 13:50:39 2016 +0800
Committer: Ralph, Su <suliangfei@gmail.com>
Committed: Wed Jun 1 13:50:39 2016 +0800

----------------------------------------------------------------------
 eagle-core/eagle-alert/alert/.gitignore         |  80 +++
 eagle-core/eagle-alert/alert/README.md          | 151 +++++
 .../eagle-alert/alert/alert-assembly/pom.xml    |  86 +++
 .../src/assembly/alert-assembly.xml             |  41 ++
 .../eagle-alert/alert/alert-common/.gitignore   |   2 +
 .../eagle-alert/alert/alert-common/pom.xml      | 129 ++++
 .../eagle/alert/config/ConfigBusBase.java       |  40 ++
 .../eagle/alert/config/ConfigBusConsumer.java   |  40 ++
 .../eagle/alert/config/ConfigBusProducer.java   |  37 ++
 .../alert/config/ConfigChangeCallback.java      |   5 +
 .../apache/eagle/alert/config/ConfigValue.java  |  33 +
 .../org/apache/eagle/alert/config/ZKConfig.java |  17 +
 .../eagle/alert/config/ZKConfigBuilder.java     |  38 ++
 .../alert/coordination/model/AlertBoltSpec.java | 114 ++++
 .../coordination/model/Kafka2TupleMetadata.java |  97 +++
 .../coordination/model/PolicyWorkerQueue.java   |  63 ++
 .../alert/coordination/model/PublishSpec.java   |  82 +++
 .../alert/coordination/model/RouterSpec.java    |  76 +++
 .../alert/coordination/model/ScheduleState.java | 215 +++++++
 .../alert/coordination/model/SpoutSpec.java     | 124 ++++
 .../coordination/model/StreamNameSelector.java  |  15 +
 .../model/StreamRepartitionMetadata.java        |  53 ++
 .../model/StreamRepartitionStrategy.java        |  82 +++
 .../coordination/model/StreamRouterSpec.java    |  75 +++
 .../model/Tuple2StreamConverter.java            |  65 ++
 .../model/Tuple2StreamMetadata.java             |  71 +++
 .../model/VersionedPolicyDefinition.java        |  53 ++
 .../model/VersionedStreamDefinition.java        |  53 ++
 .../alert/coordination/model/WorkSlot.java      |  55 ++
 .../model/internal/MonitoredStream.java         |  91 +++
 .../model/internal/PolicyAssignment.java        |  69 ++
 .../model/internal/StreamGroup.java             |  82 +++
 .../model/internal/StreamWorkSlotQueue.java     | 124 ++++
 .../coordination/model/internal/Topology.java   | 170 +++++
 .../eagle/alert/engine/codec/IStreamCodec.java  |  30 +
 .../alert/engine/codec/SherlockEventCodec.java  |  41 ++
 .../alert/engine/codec/SherlockMetricCodec.java |  39 ++
 .../engine/coordinator/PolicyDefinition.java    | 199 ++++++
 .../alert/engine/coordinator/Publishment.java   | 101 +++
 .../engine/coordinator/PublishmentType.java     |  83 +++
 .../alert/engine/coordinator/StreamColumn.java  | 128 ++++
 .../engine/coordinator/StreamDefinition.java    | 105 +++
 .../engine/coordinator/StreamPartition.java     | 143 +++++
 .../engine/coordinator/StreamSortSpec.java      |  98 +++
 .../engine/coordinator/StreamingCluster.java    |  89 +++
 .../alert/engine/model/AlertStreamEvent.java    |  89 +++
 .../alert/engine/model/PartitionedEvent.java    | 149 +++++
 .../eagle/alert/engine/model/StreamEvent.java   | 144 +++++
 .../alert/engine/model/StreamEventBuilder.java  |  72 +++
 .../eagle/alert/metric/IMetricSystem.java       |  64 ++
 .../apache/eagle/alert/metric/MetricSystem.java | 116 ++++
 .../eagle/alert/metric/entity/MetricEvent.java  | 111 ++++
 .../alert/metric/reporter/KafkaReporter.java    | 197 ++++++
 .../eagle/alert/metric/sink/ConsoleSink.java    |  47 ++
 .../alert/metric/sink/ElasticSearchSink.java    |  74 +++
 .../apache/eagle/alert/metric/sink/JmxSink.java |  48 ++
 .../eagle/alert/metric/sink/KafkaSink.java      |  65 ++
 .../eagle/alert/metric/sink/MetricSink.java     |  30 +
 .../alert/metric/sink/MetricSinkRepository.java |  47 ++
 .../eagle/alert/metric/sink/Slf4jSink.java      |  74 +++
 .../alert/metric/source/JVMMetricSource.java    |  41 ++
 .../eagle/alert/metric/source/MetricSource.java |  24 +
 .../metric/source/MetricSourceWrapper.java      |  39 ++
 .../eagle/alert/resource/SimpleCORSFiler.java   |  59 ++
 .../alert/service/IMetadataServiceClient.java   |  69 ++
 .../service/MetadataServiceClientImpl.java      | 235 +++++++
 .../eagle/alert/utils/AlertConstants.java       |  27 +
 .../org/apache/eagle/alert/utils/ByteUtils.java | 172 +++++
 .../apache/eagle/alert/utils/ConfigUtils.java   |  31 +
 .../apache/eagle/alert/utils/DateTimeUtil.java  | 141 ++++
 .../org/apache/eagle/alert/utils/HostUtils.java |  70 ++
 .../org/apache/eagle/alert/utils/JsonUtils.java |  41 ++
 .../eagle/alert/utils/StreamIdConversion.java   |  36 ++
 .../eagle/alert/utils/TimePeriodUtils.java      |  62 ++
 .../src/main/resources/log4j.properties         |  21 +
 .../eagle/alert/config/TestConfigBus.java       |  61 ++
 .../eagle/alert/metric/MetricSystemTest.java    |  78 +++
 .../service/TestMetadataServiceClientImpl.java  |  40 ++
 .../apache/eagle/correlation/meta/JsonTest.java |  30 +
 .../src/test/resources/application.conf         |  17 +
 .../src/test/resources/log4j.properties         |  21 +
 .../alert/alert-coordinator/.gitignore          |   1 +
 .../eagle-alert/alert/alert-coordinator/pom.xml |  96 +++
 .../eagle/alert/coordinator/Coordinator.java    | 212 +++++++
 .../alert/coordinator/CoordinatorConstants.java |  31 +
 .../alert/coordinator/IPolicyScheduler.java     |  34 +
 .../alert/coordinator/IScheduleContext.java     |  54 ++
 .../coordinator/PolicySchedulerFactory.java     |  30 +
 .../eagle/alert/coordinator/ScheduleOption.java |  74 +++
 .../alert/coordinator/TopologyMgmtService.java  |  83 +++
 .../coordinator/impl/GreedyPolicyScheduler.java | 340 ++++++++++
 .../impl/MonitorMetadataGenerator.java          | 287 +++++++++
 .../alert/coordinator/impl/ScheduleResult.java  |  42 ++
 .../eagle/alert/coordinator/impl/WorkItem.java  |  33 +
 .../coordinator/impl/WorkQueueBuilder.java      |  93 +++
 .../impl/strategies/IWorkSlotStrategy.java      |  32 +
 .../strategies/SameTopologySlotStrategy.java    | 164 +++++
 .../alert/coordinator/model/AlertBoltUsage.java |  93 +++
 .../alert/coordinator/model/GroupBoltUsage.java |  67 ++
 .../alert/coordinator/model/TopologyUsage.java  | 102 +++
 .../provider/InMemScheduleConext.java           | 148 +++++
 .../provider/ScheduleContextBuilder.java        | 400 ++++++++++++
 .../resource/CoordinatorResource.java           |  69 ++
 .../trigger/DynamicPolicyLoader.java            |  81 +++
 .../trigger/PolicyChangeListener.java           |  10 +
 .../src/main/resources/application.conf         |  27 +
 .../src/main/resources/log4j.properties         |  21 +
 .../src/main/webapp/WEB-INF/web.xml             |  87 +++
 .../src/main/webapp/index.html                  |  18 +
 .../alert/coordinator/CoordinatorTest.java      | 124 ++++
 .../coordinator/DynamicPolicyLoaderTest.java    |  26 +
 .../MetadataServiceClientImplTest.java          |  57 ++
 .../coordinator/ScheduleContextBuilderTest.java | 281 ++++++++
 .../apache/alert/coordinator/SchedulerTest.java | 635 +++++++++++++++++++
 .../alert/coordinator/WorkSlotStrategyTest.java | 162 +++++
 .../mock/InMemMetadataServiceClient.java        | 147 +++++
 .../mock/TestTopologyMgmtService.java           |  73 +++
 .../src/test/resources/test-application.conf    |  27 +
 .../eagle-alert/alert/alert-devtools/.gitignore |   3 +
 .../alert/alert-devtools/bin/kafka-producer.sh  |  21 +
 .../alert-devtools/bin/kafka-server-start.sh    |  51 ++
 .../alert-devtools/bin/kafka-server-status.sh   |  24 +
 .../alert-devtools/bin/kafka-server-stop.sh     |  23 +
 .../alert/alert-devtools/bin/kafka-topics.sh    |  17 +
 .../alert/alert-devtools/bin/run-class.sh       | 112 ++++
 .../alert-devtools/bin/start-coordinator.sh     |  20 +
 .../alert-devtools/bin/start-integration1.sh    |  16 +
 .../alert-devtools/bin/start-integration2.sh    |  19 +
 .../alert/alert-devtools/bin/start-metadata.sh  |  20 +
 .../alert-devtools/bin/start-sampleclient1.sh   |   6 +
 .../alert-devtools/bin/start-sampleclient2.sh   |   6 +
 .../alert/alert-devtools/bin/start-zk-kafka.sh  |  28 +
 .../alert/alert-devtools/bin/stop-zk-kafka.sh   |  28 +
 .../bin/zookeeper-server-start.sh               |  50 ++
 .../bin/zookeeper-server-status.sh              |  24 +
 .../alert-devtools/bin/zookeeper-server-stop.sh |  24 +
 .../alert-devtools/conf/cli-log4j.properties    |  21 +
 .../alert-devtools/conf/kafka-server.properties | 115 ++++
 .../alert/alert-devtools/conf/log4j.properties  |  21 +
 .../conf/zookeeper-server.properties            |  20 +
 .../eagle-alert/alert/alert-devtools/pom.xml    |  96 +++
 .../eagle/alert/tools/KafkaConsumerOffset.java  |  27 +
 .../alert/tools/KafkaConsumerOffsetFetcher.java |  66 ++
 .../alert/tools/KafkaLatestOffsetFetcher.java   |  97 +++
 .../eagle/contrib/kafka/ProducerTool.scala      | 230 +++++++
 .../eagle/alert/tools/TestKafkaOffset.java      |  69 ++
 .../kafka-offset-test.application.conf          |   9 +
 .../src/test/resources/log4j.properties         |  21 +
 .../eagle-alert/alert/alert-engine/.gitignore   |   1 +
 .../alert-engine/alert-engine-base/pom.xml      |  65 ++
 .../alert/engine/AlertStreamCollector.java      |  27 +
 .../apache/eagle/alert/engine/Collector.java    |  27 +
 .../alert/engine/PartitionedEventCollector.java |  29 +
 .../eagle/alert/engine/StreamContext.java       |  27 +
 .../eagle/alert/engine/StreamContextImpl.java   |  26 +
 .../eagle/alert/engine/UnitTopologyMain.java    |  59 ++
 .../IMetadataChangeNotifyService.java           |  57 ++
 .../alert/engine/coordinator/MetadataType.java  |  30 +
 .../StreamDefinitionNotFoundException.java      |  25 +
 .../AbstractMetadataChangeNotifyService.java    | 113 ++++
 .../impl/ZKMetadataChangeNotifyService.java     | 200 ++++++
 .../engine/evaluator/PolicyChangeListener.java  |  29 +
 .../engine/evaluator/PolicyGroupEvaluator.java  |  45 ++
 .../engine/evaluator/PolicyHandlerContext.java  |  60 ++
 .../engine/evaluator/PolicyStreamHandler.java   |  27 +
 .../engine/evaluator/PolicyStreamHandlers.java  |  33 +
 ...ertBoltOutputCollectorThreadSafeWrapper.java | 118 ++++
 .../impl/AlertBoltOutputCollectorWrapper.java   |  55 ++
 .../impl/PolicyGroupEvaluatorImpl.java          | 172 +++++
 .../evaluator/impl/SiddhiDefinitionAdapter.java | 142 +++++
 .../evaluator/impl/SiddhiPolicyHandler.java     | 148 +++++
 .../engine/publisher/AlertDeduplicator.java     |  31 +
 .../engine/publisher/AlertPublishListener.java  |  28 +
 .../engine/publisher/AlertPublishPlugin.java    |  50 ++
 .../publisher/AlertPublishSpecListener.java     |  26 +
 .../alert/engine/publisher/AlertPublisher.java  |  31 +
 .../eagle/alert/engine/publisher/AlertSink.java |  33 +
 .../engine/publisher/PublishConstants.java      |  48 ++
 .../publisher/email/AlertEmailConstants.java    |  39 ++
 .../publisher/email/AlertEmailContext.java      |  71 +++
 .../publisher/email/AlertEmailGenerator.java    | 143 +++++
 .../email/AlertEmailGeneratorBuilder.java       |  59 ++
 .../publisher/email/AlertEmailSender.java       | 164 +++++
 .../engine/publisher/email/EagleMailClient.java | 240 +++++++
 .../impl/AlertEagleStorePersister.java          |  60 ++
 .../impl/AlertEagleStorePublisher.java          | 113 ++++
 .../publisher/impl/AlertEmailPublisher.java     | 151 +++++
 .../publisher/impl/AlertKafkaPublisher.java     | 136 ++++
 .../impl/AlertPublishPluginsFactory.java        |  48 ++
 .../publisher/impl/AlertPublisherImpl.java      | 154 +++++
 .../publisher/impl/DefaultDeduplicator.java     | 113 ++++
 .../alert/engine/publisher/impl/EventUniq.java  |  57 ++
 .../publisher/impl/KafkaProducerManager.java    |  42 ++
 .../engine/publisher/impl/PublishStatus.java    |  27 +
 .../engine/router/AlertBoltSpecListener.java    |  32 +
 .../alert/engine/router/SpoutSpecListener.java  |  32 +
 .../eagle/alert/engine/router/StreamRoute.java  |  85 +++
 .../router/StreamRoutePartitionFactory.java     |  37 ++
 .../engine/router/StreamRoutePartitioner.java   |  29 +
 .../engine/router/StreamRouteSpecListener.java  |  30 +
 .../eagle/alert/engine/router/StreamRouter.java |  30 +
 .../router/StreamRouterBoltSpecListener.java    |  33 +
 .../alert/engine/router/StreamSortHandler.java  |  42 ++
 .../engine/router/StreamSortSpecListener.java   |  28 +
 .../impl/BasicStreamRoutePartitioner.java       |  77 +++
 .../router/impl/RoutePhysicalGrouping.java      |  76 +++
 .../engine/router/impl/ShuffleGrouping.java     |  74 +++
 .../impl/StreamRouterBoltOutputCollector.java   | 229 +++++++
 .../engine/router/impl/StreamRouterImpl.java    | 163 +++++
 .../alert/engine/runner/AbstractStreamBolt.java |  93 +++
 .../eagle/alert/engine/runner/AlertBolt.java    | 157 +++++
 .../alert/engine/runner/AlertPublisherBolt.java | 113 ++++
 .../alert/engine/runner/MapComparator.java      |  72 +++
 .../engine/runner/StormMetricConsumer.java      | 136 ++++
 .../runner/StormMetricTaggedConsumer.java       | 156 +++++
 .../alert/engine/runner/StreamRouterBolt.java   | 201 ++++++
 .../alert/engine/runner/UnitTopologyRunner.java | 215 +++++++
 .../eagle/alert/engine/scheme/JsonScheme.java   |  71 +++
 .../scheme/JsonStringStreamNameSelector.java    |  74 +++
 .../alert/engine/scheme/PlainStringScheme.java  |  67 ++
 .../scheme/PlainStringStreamNameSelector.java   |  49 ++
 .../PartitionedEventDigestSerializer.java       |  62 ++
 .../PartitionedEventSerializer.java             |  39 ++
 .../SerializationMetadataProvider.java          |  30 +
 .../alert/engine/serialization/Serializer.java  |  26 +
 .../alert/engine/serialization/Serializers.java |  56 ++
 .../serialization/impl/BooleanSerializer.java   |  35 +
 .../serialization/impl/DoubleSerializer.java    |  35 +
 .../serialization/impl/FloatSerializer.java     |  35 +
 .../serialization/impl/IntegerSerializer.java   |  35 +
 .../impl/JavaObjectSerializer.java              |  42 ++
 .../serialization/impl/LongSerializer.java      |  35 +
 .../impl/PartitionedEventSerializerImpl.java    |  98 +++
 .../impl/StreamEventSerializer.java             |  87 +++
 .../impl/StreamPartitionDigestSerializer.java   |  83 +++
 .../impl/StreamPartitionSerializer.java         |  64 ++
 .../serialization/impl/StringSerializer.java    |  35 +
 .../extension/AttributeCollectAggregator.java   | 122 ++++
 .../alert/engine/sorter/BaseStreamWindow.java   | 183 ++++++
 .../alert/engine/sorter/StreamTimeClock.java    |  45 ++
 .../engine/sorter/StreamTimeClockListener.java  |  27 +
 .../engine/sorter/StreamTimeClockManager.java   |  45 ++
 .../engine/sorter/StreamTimeClockTrigger.java   |  61 ++
 .../eagle/alert/engine/sorter/StreamWindow.java | 113 ++++
 .../engine/sorter/StreamWindowManager.java      |  64 ++
 .../engine/sorter/StreamWindowRepository.java   | 257 ++++++++
 .../sorter/impl/CachedEventGroupSerializer.java | 114 ++++
 .../impl/PartitionedEventGroupSerializer.java   | 112 ++++
 .../PartitionedEventTimeOrderingComparator.java |  50 ++
 .../impl/StreamSortWindowHandlerImpl.java       | 113 ++++
 .../sorter/impl/StreamSortedWindowInMapDB.java  | 145 +++++
 .../sorter/impl/StreamSortedWindowOnHeap.java   |  80 +++
 .../impl/StreamTimeClockInLocalMemory.java      |  64 ++
 .../sorter/impl/StreamTimeClockManagerImpl.java | 171 +++++
 .../sorter/impl/StreamWindowManagerImpl.java    | 173 +++++
 .../alert/engine/spout/CorrelationSpout.java    | 380 +++++++++++
 .../alert/engine/spout/CreateTopicUtils.java    |  43 ++
 .../eagle/alert/engine/spout/ISpoutSpecLCM.java |  41 ++
 .../engine/spout/KafkaMessageIdWrapper.java     |  42 ++
 .../eagle/alert/engine/spout/SchemeBuilder.java |  35 +
 .../spout/SpoutOutputCollectorWrapper.java      | 220 +++++++
 .../alert/engine/utils/CompressionUtils.java    |  62 ++
 .../alert/engine/utils/MetadataSerDeser.java    |  93 +++
 .../alert/engine/utils/SerializableUtils.java   | 112 ++++
 .../main/java/storm/kafka/KafkaSpoutMetric.java |  78 +++
 .../java/storm/kafka/KafkaSpoutWrapper.java     | 111 ++++
 .../src/main/resources/ALERT_DEFAULT.vm         | 267 ++++++++
 .../src/main/resources/application.conf         |  72 +++
 .../src/main/resources/eagle.siddhiext          |  19 +
 .../src/main/resources/log4j.properties         |  26 +
 .../alert/engine/e2e/CoordinatorClient.java     |  95 +++
 .../eagle/alert/engine/e2e/Integration1.java    | 236 +++++++
 .../eagle/alert/engine/e2e/Integration2.java    | 142 +++++
 .../eagle/alert/engine/e2e/SampleClient1.java   | 127 ++++
 .../eagle/alert/engine/e2e/SampleClient2.java   | 139 ++++
 ...oltOutputCollectorThreadSafeWrapperTest.java |  74 +++
 .../SiddhiCEPPolicyEventHandlerTest.java        | 161 +++++
 .../integration/MockMetadataServiceClient.java  | 124 ++++
 .../engine/metric/MemoryUsageGaugeSetTest.java  |  47 ++
 .../engine/mock/MockPartitionedCollector.java   |  55 ++
 .../engine/mock/MockSampleMetadataFactory.java  | 266 ++++++++
 .../alert/engine/mock/MockStreamCollector.java  |  51 ++
 .../engine/mock/MockStreamMetadataService.java  |  39 ++
 .../alert/engine/mock/MockStreamReceiver.java   |  79 +++
 .../alert/engine/perf/TestSerDeserPer.java      | 301 +++++++++
 .../alert/engine/router/TestAlertBolt.java      | 177 ++++++
 .../engine/router/TestAlertPublisherBolt.java   | 111 ++++
 .../engine/runner/TestStreamRouterBolt.java     | 259 ++++++++
 .../serialization/JavaSerializationTest.java    | 112 ++++
 .../PartitionedEventSerializerTest.java         | 220 +++++++
 .../AttributeCollectAggregatorTest.java         | 149 +++++
 .../alert/engine/sorter/MapDBTestSuite.java     |  45 ++
 .../engine/sorter/StreamSortHandlerTest.java    | 270 ++++++++
 .../sorter/StreamWindowBenchmarkTest.java       | 145 +++++
 .../engine/sorter/StreamWindowTestSuite.java    | 164 +++++
 .../sorter/TreeMultisetComparatorTest.java      |  94 +++
 .../engine/topology/AlertTopologyTest.java      | 138 ++++
 .../CoordinatorSpoutIntegrationTest.java        | 101 +++
 .../engine/topology/CorrelationSpoutTest.java   | 184 ++++++
 .../engine/topology/FastWordCountTopology.java  | 215 +++++++
 .../MockMetadataChangeNotifyService.java        | 111 ++++
 .../engine/topology/SendData2KafkaTest.java     | 103 +++
 .../eagle/alert/engine/topology/TestBolt.java   |  61 ++
 .../alert/engine/topology/TestByteBuffer.java   |  53 ++
 .../topology/TestMetadataSpecSerDeser.java      | 266 ++++++++
 .../TestStormCustomGroupingRouting.java         | 144 +++++
 .../engine/topology/TestStormParallelism.java   | 161 +++++
 .../topology/TestStormStreamIdRouting.java      | 140 ++++
 .../topology/TestTuple2StreamConverter.java     |  62 ++
 .../engine/topology/TestUnitTopologyMain.java   |  57 ++
 .../engine/utils/CompressionUtilsTest.java      |  43 ++
 .../alert/engine/utils/TimePeriodUtilsTest.java |  91 +++
 .../test/resources/application-integration.conf |  57 ++
 .../test/resources/application-test-backup.conf |  71 +++
 .../src/test/resources/application-test.conf    |  68 ++
 .../correlation/application-integration-2.conf  |  57 ++
 .../test/resources/correlation/datasources.json |  37 ++
 .../test/resources/correlation/policies.json    |  39 ++
 .../resources/correlation/publishments.json     |  17 +
 .../correlation/streamdefinitions.json          |  93 +++
 .../test/resources/correlation/topologies.json  |  31 +
 .../src/test/resources/datasources.json         |  19 +
 .../src/test/resources/log4j.properties         |  23 +
 .../src/test/resources/policies.json            |  54 ++
 .../src/test/resources/publishments.json        |  29 +
 .../src/test/resources/publishments2.json       |  19 +
 .../src/test/resources/sample_perfmon_data.json |   3 +
 .../src/test/resources/streamdefinitions.json   |  44 ++
 .../src/test/resources/testAlertBoltSpec.json   |  92 +++
 .../src/test/resources/testPublishSpec.json     |  32 +
 .../src/test/resources/testPublishSpec2.json    |  32 +
 .../src/test/resources/testSpoutSpec.json       | 139 ++++
 .../resources/testStreamDefinitionsSpec.json    |  47 ++
 .../resources/testStreamRouterBoltSpec.json     | 123 ++++
 .../src/test/resources/topic.json               |   1 +
 .../src/test/resources/topologies.json          |  31 +
 .../eagle-alert/alert/alert-engine/pom.xml      | 110 ++++
 .../alert-metadata-service/.gitignore           |   2 +
 .../alert-metadata-service/pom.xml              | 113 ++++
 .../metadata/resource/MetadataResource.java     | 225 +++++++
 .../topology/resource/TopologyMgmtResource.java |  69 ++
 .../impl/TopologyMgmtResourceHelper.java        |  48 ++
 .../resource/impl/TopologyMgmtResourceImpl.java | 158 +++++
 .../topology/resource/impl/TopologyStatus.java  |  82 +++
 .../src/main/resources/application.conf         |   6 +
 .../src/main/webapp/WEB-INF/web.xml             |  81 +++
 .../src/main/webapp/index.html                  |  18 +
 .../impl/TopologyMgmtResourceImplTest.java      |  55 ++
 .../alert-metadata/.gitignore                   |   2 +
 .../alert-metadata/pom.xml                      |  52 ++
 .../metadata/impl/InMemMetadataDaoImpl.java     | 333 ++++++++++
 .../alert/metadata/impl/MetadataDaoFactory.java |  70 ++
 .../metadata/impl/MongoMetadataDaoImpl.java     | 380 +++++++++++
 .../metadata/impl/jdbc/JdbcMetadataDaoImpl.java | 292 +++++++++
 .../alert/metadata/resource/IMetadataDao.java   |  86 +++
 .../eagle/alert/metadata/resource/Models.java   |  48 ++
 .../eagle/alert/metadata/resource/OpResult.java |  28 +
 .../src/main/resources/application.conf         |   8 +
 .../alert/resource/impl/InMemoryTest.java       |  43 ++
 .../alert/resource/impl/MongoImplTest.java      | 187 ++++++
 .../src/test/resources/application-mongo.conf   |   6 +
 .../src/test/resources/policy-sample.json       |  32 +
 .../alert/alert-metadata-parent/pom.xml         |  29 +
 eagle-core/eagle-alert/alert/pom.xml            | 387 +++++++++++
 364 files changed, 31608 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/.gitignore b/eagle-core/eagle-alert/alert/.gitignore
new file mode 100644
index 0000000..12af842
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/.gitignore
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Eagle - Git Ignore Configuration
+#
+# See: https://github.com/github/gitignore/
+
+*.class
+*.out
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+
+.cache-main
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# Maven
+target/
+target/*
+*/target/*
+**/target/*
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+eagle-security/eagle-security-mltraining/src/test/resources/outputDir/
+eagle-security/eagle-security-userprofile/src/test/resources/models/
+
+# IntelliJ IDEA
+.idea/
+.metadata/
+**.iml
+**.eml
+**.patch
+mobile.userlibraries
+
+# Eclipse
+build
+.metadata
+.classpath
+.project
+.settings
+.externalToolBuilders
+classes
+# bin
+
+# Temporary files
+logs/
+*.log*
+# Mac files
+.DS_Store
+
+*.cache-tests
+application-local.conf
+
+*.orig
+**/*.pyc
+*.bak

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/README.md
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/README.md b/eagle-core/eagle-alert/alert/README.md
new file mode 100644
index 0000000..54cd226
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/README.md
@@ -0,0 +1,151 @@
+
+## Prerequisites
+
+* [Apache Maven](https://maven.apache.org/)
+* [Java 8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)
+
+## Documentation
+
+
+## Build
+
+    mvn install
+    
+## Setup
+The alert engine have three dependency module: Coordinator Service, Metadata Service, and engine runtime(storm topologies).
+
+####0. Dependencies
+> Alert engine need kafka as data source, ZK as coordination. Check alert-devtools/bin to start zk and kafka through start-zk-kafka.sh.
+
+####1. Start metadata service
+> For local dev, project alert-metadata-service packaging as a war, and enabled mvn jetty:run to run it. By default, metadata runs on localhost:8080
+
+> For deployment, after mvn install, a war is avaialble in alert-metadata-service/target
+
+####2. Start coordiantor service
+> For local dev, project alert-coordinator packaing as a war, and enabled mvn jetty:run to run it. By default, it runs in localhost:9090, and have dependency on metadata. See application.conf for coordinator.
+
+> For deployment, find war in alert-coordinator/target after mvn install
+
+####3. Start engine runtime.
+> The engine are the topologies that runs in any storm (local or remote) with configuration to connect to the ZK and metadata service. The alert engine runtime main as in UnitTopologyMain.java. The started storm bolt should have the same name described in alert-metadata. Example of the configuration is /alert-engine-base/src/main/resources/application.conf 
+
+See below detailed steps.
+
+
+## Run
+* pre-requisites
+  * zookeeper
+  * storm
+  * kafka
+  * tomcat
+  * mongdb
+
+* Run Metadata service
+    1. copy alert-metadata/target/alert-metadata-0.0.1-SNAPSHOT.war into tomcat webapps/alertmetadata.war
+    2. check config under webapps/alertmetadata/WEB-INF/classes/application.conf
+    ```json
+    {
+	"datastore": {
+		"metadataDao": "org.apache.eagle.alert.metadata.impl.MongoMetadataDaoImpl",
+		"connection": "localhost:27017"
+	}
+     }
+    ```
+    
+    3. start tomcat
+    
+* Run Coordinator service
+    1. copy alert-coordinator/target/alert-coordinator-0.0.1-SNAPSHOT.war to tomcat webappes/coordinator.war
+    2. check config under webapps/coordinator/WEB-INF/classes/application.conf
+    ```json
+      {
+	"coordinator" : {
+		"policiesPerBolt" : 5,
+		"boltParallelism" : 5,
+		"policyDefaultParallelism" : 5,
+		"boltLoadUpbound": 0.8,
+		"topologyLoadUpbound" : 0.8,
+		"numOfAlertBoltsPerTopology" : 5,
+		"zkConfig" : {
+			"zkQuorum" : "localhost:2181",
+			"zkRoot" : "/alert",
+			"zkSessionTimeoutMs" : 10000,
+			"connectionTimeoutMs" : 10000,
+			"zkRetryTimes" : 3,
+			"zkRetryInterval" : 3000
+		},
+		"metadataService" : {
+			"host" : "localhost",
+			"port" : 8080,
+			"context" : "/alertmetadata/api"
+		},
+		"metadataDynamicCheck" : {
+			"initDelayMillis" : 1000,
+			"delayMillis" : 30000
+		}
+	}
+   }
+   ```
+    3. start tomcat
+
+* Run UnitTopologyMain
+    1. copy alert-assembly/target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar to somewhere close to your storm installation
+    2. check config application.conf
+   ```json
+  {
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers" : 2,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "messageTimeoutSecs": 3600,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/alertmetadata/api",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "coordinatorService": {
+    "host": "localhost",
+    "port": 8080,
+    "context" : "/coordinator/api"
+  }
+  "metric": {
+    "sink": {
+      "stdout": {}
+    }
+  }
+}
+```
+  Note: please make sure the above configuration is used by storm instead of the configuration within fat jar
+  3. start storm
+     storm jar alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.UnitTopologyMain
+
+## Support
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-assembly/pom.xml b/eagle-core/eagle-alert/alert/alert-assembly/pom.xml
new file mode 100644
index 0000000..b48ee5b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-assembly/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.eagle</groupId>
+        <artifactId>alert-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>alert-assembly</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-coordinator</artifactId>
+            <version>${project.version}</version>
+            <type>war</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-metadata-service</artifactId>
+            <version>${project.version}</version>
+            <type>war</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-engine-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- package test cases -->
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-coordinator</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-engine-base</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/assembly/alert-assembly.xml</descriptor>
+                    <finalName>alert-engine-${project.version}</finalName>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <tarLongFileMode>posix</tarLongFileMode>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml b/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml
new file mode 100644
index 0000000..556b2ad
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-assembly/src/assembly/alert-assembly.xml
@@ -0,0 +1,41 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+    <id>alert-assembly</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>false</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+            </unpackOptions>
+            <excludes>
+                <!--<exclude>org.apache.storm:storm-core</exclude>
+                <exclude>org.slf4j:slf4j-log4j12</exclude>-->
+                <exclude>org.apache.storm:storm-core</exclude>
+                <exclude>org.slf4j:slf4j-api</exclude>
+                <exclude>org.slf4j:log4j-over-slf4j</exclude>
+                <exclude>org.slf4j:slf4j-log4j12</exclude>
+                <exclude>log4j:log4j</exclude>
+                <exclude>asm:asm</exclude>
+                <exclude>org.apache.log4j.wso2:log4j</exclude>
+                <exclude>log4j:apache-log4j-extras</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.outputDirectory}</directory>
+            <outputDirectory>/</outputDirectory>
+            <excludes>
+            </excludes>
+            <includes>
+            </includes>
+        </fileSet>
+    </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/.gitignore b/eagle-core/eagle-alert/alert/alert-common/.gitignore
new file mode 100644
index 0000000..1dd3331
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/.gitignore
@@ -0,0 +1,2 @@
+/target/
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/pom.xml b/eagle-core/eagle-alert/alert/alert-common/pom.xml
new file mode 100644
index 0000000..129cff0
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more 
+	~ contributor license agreements. See the NOTICE file distributed with ~ 
+	this work for additional information regarding copyright ownership. ~ The 
+	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the 
+	"License"); you may not use this file except in compliance with ~ the License. 
+	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 
+	~ ~ Unless required by applicable law or agreed to in writing, software ~ 
+	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT 
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the 
+	License for the specific language governing permissions and ~ limitations 
+	under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.eagle</groupId>
+		<artifactId>alert-parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>alert-common</artifactId>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>com.typesafe</groupId>
+			<artifactId>config</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-annotations</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-lang3</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>commons-collections</groupId>
+			<artifactId>commons-collections</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>commons-lang</groupId>
+			<artifactId>commons-lang</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-framework</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-recipes</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-jaxrs</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.elasticsearch</groupId>
+			<artifactId>metrics-elasticsearch-reporter</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>${kafka.artifact.id}</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-jvm</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.6</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java
new file mode 100644
index 0000000..7644d89
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusBase.java
@@ -0,0 +1,40 @@
+package org.apache.eagle.alert.config;
+
+import java.io.Closeable;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+
+
+/**
+ * Abstraction of asynchronized configuration management
+ * This is used for config change notification between processes, without this one process has to pull changes triggered by another process
+ *
+ * Config bus is similar to message bus, config change producer can publish config change(message) to config bus,
+ *  while config change consumer can subscribe config change and do business logic in callback
+ * 1. use zookeeper as media to notify config consumer of config changes
+ * 2. each type of config is represented by topic
+ * 3. each config change can contain actual value or contain reference Id which consumer uses to retrieve actual value. This mechanism will reduce zookeeper overhed
+ *
+ */
+public class ConfigBusBase implements Closeable{
+    protected String zkRoot;
+    protected CuratorFramework curator;
+
+    public ConfigBusBase(ZKConfig config) {
+        this.zkRoot = config.zkRoot;
+        curator = CuratorFrameworkFactory.newClient(
+                config.zkQuorum,
+                config.zkSessionTimeoutMs,
+                config.connectionTimeoutMs,
+                new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval)
+        );
+        curator.start();
+    }
+
+    @Override
+    public void close(){
+        curator.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
new file mode 100644
index 0000000..11eea1f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusConsumer.java
@@ -0,0 +1,40 @@
+package org.apache.eagle.alert.config;
+
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * 1. When consumer is started, it always get notified of config
+ * 2. When config is changed, consumer always get notified of config change
+ *
+ * Reliability issue:
+ * TODO How to ensure config change message is always delivered to consumer
+ */
+public class ConfigBusConsumer extends ConfigBusBase {
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusConsumer.class);
+
+    private NodeCache cache;
+    public ConfigBusConsumer(ZKConfig config, String topic, ConfigChangeCallback callback){
+        super(config);
+        String zkPath = zkRoot + "/" + topic;
+        LOG.info("monitor change for zkPath " + zkPath);
+        cache = new NodeCache(curator, zkPath);
+        cache.getListenable().addListener( () ->
+            {
+                // get node value and notify callback
+                byte[] value = curator.getData().forPath(zkPath);
+                ObjectMapper mapper = new ObjectMapper();
+                ConfigValue v = mapper.readValue(value, ConfigValue.class);
+                callback.onNewConfig(v);
+            }
+        );
+        try {
+            cache.start();
+        }catch(Exception ex){
+            LOG.error("error start NodeCache listener", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java
new file mode 100644
index 0000000..6dab51a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigBusProducer.java
@@ -0,0 +1,37 @@
+package org.apache.eagle.alert.config;
+
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class ConfigBusProducer extends ConfigBusBase {
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ConfigBusProducer.class);
+
+    public ConfigBusProducer(ZKConfig config){
+        super(config);
+    }
+
+    /**
+     * @param topic
+     * @param config
+     */
+    public void send(String topic, ConfigValue config){
+        // check if topic exists, create this topic if not existing
+        String zkPath = zkRoot + "/" + topic;
+        try {
+            if (curator.checkExists().forPath(zkPath) == null) {
+                curator.create()
+                        .creatingParentsIfNeeded()
+                        .withMode(CreateMode.PERSISTENT)
+                        .forPath(zkPath);
+            }
+            ObjectMapper mapper = new ObjectMapper();
+            byte[] content = mapper.writeValueAsBytes(config);
+            curator.setData().forPath(zkPath, content);
+        }catch(Exception ex){
+            LOG.error("error creating zkPath " + zkPath, ex);
+            throw new RuntimeException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java
new file mode 100644
index 0000000..60df605
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigChangeCallback.java
@@ -0,0 +1,5 @@
+package org.apache.eagle.alert.config;
+
+public interface ConfigChangeCallback {
+    void onNewConfig(ConfigValue value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java
new file mode 100644
index 0000000..f59c2b9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ConfigValue.java
@@ -0,0 +1,33 @@
+package org.apache.eagle.alert.config;
+
+
+/**
+ * Config body contains actual data for one topic
+ * this is serialized with json format into zookeeper
+ * value can be versionId which is used for referencing outside data
+ * or value can be actual config value
+ */
+public class ConfigValue {
+    private boolean isValueVersionId;
+    private Object value;
+
+    public boolean isValueVersionId() {
+        return isValueVersionId;
+    }
+
+    public void setValueVersionId(boolean valueVersionId) {
+        isValueVersionId = valueVersionId;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    public void setValue(Object value) {
+        this.value = value;
+    }
+
+    public String toString(){
+        return "isValueVersionId: " + isValueVersionId + ", value: " + value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java
new file mode 100644
index 0000000..fe04b4b
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfig.java
@@ -0,0 +1,17 @@
+package org.apache.eagle.alert.config;
+
+import java.io.Serializable;
+
+/**
+ * Memory representation of key zookeeper configurations
+ */
+public class ZKConfig implements Serializable{
+    private static final long serialVersionUID = -1287231022807492775L;
+
+    public String zkQuorum;
+    public String zkRoot;
+    public int zkSessionTimeoutMs;
+    public int connectionTimeoutMs;
+    public int zkRetryTimes;
+    public int zkRetryInterval;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java
new file mode 100644
index 0000000..9d77a58
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/config/ZKConfigBuilder.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.config;
+
+import com.typesafe.config.Config;
+
+/**
+ * Since 4/28/16.
+ */
+public class ZKConfigBuilder {
+    public static ZKConfig getZKConfig(Config config){
+        ZKConfig zkConfig = new ZKConfig();
+        zkConfig.zkQuorum = config.getString("zkConfig.zkQuorum");
+        zkConfig.zkRoot = config.getString("zkConfig.zkRoot");
+        zkConfig.zkSessionTimeoutMs = config.getInt("zkConfig.zkSessionTimeoutMs");
+        zkConfig.connectionTimeoutMs = config.getInt("zkConfig.connectionTimeoutMs");
+        zkConfig.zkRetryTimes = config.getInt("zkConfig.zkRetryTimes");
+        zkConfig.zkRetryInterval = config.getInt("zkConfig.zkRetryInterval");
+        return zkConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
new file mode 100644
index 0000000..83d307c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/AlertBoltSpec.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * The alert specification for topology bolts.
+ * 
+ * @since Apr 29, 2016
+ */
+public class AlertBoltSpec {
+    private String version;
+    private String topologyName;
+
+    // mapping from boltId to list of PolicyDefinitions
+    @JsonIgnore
+    private Map<String, List<PolicyDefinition>> boltPoliciesMap = new HashMap<String, List<PolicyDefinition>>();
+
+    // mapping from boltId to list of PolicyDefinition's Ids
+    private Map<String, List<String>> boltPolicyIdsMap = new HashMap<String, List<String>>();
+
+    public AlertBoltSpec() {
+    }
+
+    public AlertBoltSpec(String topo) {
+        this.topologyName = topo;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
+
+//    public List<PolicyDefinition> getBoltPolicy(String boltId) {
+//        return boltPoliciesMap.get(boltId);
+//    }
+//
+//    public void addBoltPolicy(String boltId, PolicyDefinition pd) {
+//        if (boltPoliciesMap.containsKey(boltId)) {
+//            boltPoliciesMap.get(boltId).add(pd);
+//        } else {
+//            List<PolicyDefinition> list = new ArrayList<PolicyDefinition>();
+//            boltPoliciesMap.put(boltId, list);
+//            list.add(pd);
+//        }
+//    }
+
+    public void addBoltPolicy(String boltId, String policyName) {
+        if (boltPolicyIdsMap.containsKey(boltId)) {
+            boltPolicyIdsMap.get(boltId).add(policyName);
+        } else {
+            List<String> list = new ArrayList<String>();
+            boltPolicyIdsMap.put(boltId, list);
+            list.add(policyName);
+        }
+    }
+
+    @JsonIgnore
+    public Map<String, List<PolicyDefinition>> getBoltPoliciesMap() {
+        return boltPoliciesMap;
+    }
+
+    @JsonIgnore
+    public void setBoltPoliciesMap(Map<String, List<PolicyDefinition>> boltPoliciesMap) {
+        this.boltPoliciesMap = boltPoliciesMap;
+    }
+
+    public Map<String, List<String>> getBoltPolicyIdsMap() {
+        return boltPolicyIdsMap;
+    }
+
+    public void setBoltPolicyIdsMap(Map<String, List<String>> boltPolicyIdsMap) {
+        this.boltPolicyIdsMap = boltPolicyIdsMap;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("version:%s-topo:%s, boltPolicyIdsMap %s", version, topologyName, boltPolicyIdsMap);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java
new file mode 100644
index 0000000..6c4f576
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Kafka2TupleMetadata.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.google.common.base.Objects;
+
+/**
+ * @since Apr 5, 2016
+ * this metadata model controls how to convert kafka topic into tuple stream
+ */
+public class Kafka2TupleMetadata {
+    private String type;
+    private String name; // data source name
+    private Map<String, String> properties;
+    private String topic;
+    private String schemeCls;
+
+    private Tuple2StreamMetadata codec;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setSchemeCls(String schemeCls) {
+        this.schemeCls = schemeCls;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    public Tuple2StreamMetadata getCodec() {
+        return codec;
+    }
+
+    public void setCodec(Tuple2StreamMetadata codec) {
+        this.codec = codec;
+    }
+
+    public String getTopic() {
+        return this.topic;
+    }
+    public String getSchemeCls() {
+        return this.schemeCls;
+    }
+
+    public int hashCode() {
+        return new HashCodeBuilder().append(name).append(type).build();
+    }
+
+    public boolean equals(Object obj) {
+        if (!(obj instanceof Kafka2TupleMetadata)) {
+            return false;
+        }
+        Kafka2TupleMetadata o = (Kafka2TupleMetadata) obj;
+        return Objects.equal(name, o.name) && Objects.equal(type, o.type);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java
new file mode 100644
index 0000000..e5e5618
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PolicyWorkerQueue.java
@@ -0,0 +1,63 @@
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+public class PolicyWorkerQueue {
+
+    private StreamPartition partition;
+    private List<WorkSlot> workers;
+
+    public PolicyWorkerQueue() {
+        workers = new ArrayList<>();
+    }
+
+    public PolicyWorkerQueue(List<WorkSlot> workers) {
+        this.workers = workers;
+    }
+
+    public PolicyWorkerQueue(StreamPartition partition, List<WorkSlot> workers) {
+        this.workers = workers;
+        this.partition = partition;
+    }
+
+    public StreamPartition getPartition() {
+        return partition;
+    }
+
+    public void setPartition(StreamPartition partition) {
+        this.partition = partition;
+    }
+
+    public List<WorkSlot> getWorkers() {
+        return workers;
+    }
+
+    public void setWorkers(List<WorkSlot> workers) {
+        this.workers = workers;
+    }
+
+    public String toString() {
+        return "[" + StringUtils.join(workers, ",") + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java
new file mode 100644
index 0000000..06e819a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/PublishSpec.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+
+/**
+ * 
+ * @since May 1, 2016
+ *
+ */
+public class PublishSpec {
+
+    private String topologyName;
+    // actually only publish spec for one topology
+    private String boltId;
+    private String version;
+
+    private List<Publishment> publishments = new ArrayList<Publishment>();
+
+    public PublishSpec() {
+    }
+
+    public PublishSpec(String topoName, String boltId) {
+        this.topologyName = topoName;
+        this.boltId = boltId;
+    }
+
+    public void addPublishment(Publishment p) {
+        this.publishments.add(p);
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public String getBoltId() {
+        return boltId;
+    }
+
+    public List<Publishment> getPublishments() {
+        return publishments;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
+
+    public void setBoltId(String boltId) {
+        this.boltId = boltId;
+    }
+
+    public void setPublishments(List<Publishment> publishments) {
+        this.publishments = publishments;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
new file mode 100644
index 0000000..5241920
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * @since Apr 29, 2016
+ *
+ */
+public class RouterSpec {
+    private String version;
+    private String topologyName;
+
+    private List<StreamRouterSpec> routerSpecs;
+
+    public RouterSpec() {
+        routerSpecs = new ArrayList<StreamRouterSpec>();
+    }
+
+    public RouterSpec(String topoName) {
+        this();
+        this.topologyName = topoName;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
+
+    @JsonIgnore
+    public void addRouterSpec(StreamRouterSpec routerSpec) {
+        routerSpecs.add(routerSpec);
+    }
+
+    public List<StreamRouterSpec> getRouterSpecs() {
+        return routerSpecs;
+    }
+
+    public void setRouterSpecs(List<StreamRouterSpec> routerSpecs) {
+        this.routerSpecs = routerSpecs;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
new file mode 100644
index 0000000..6036f29
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * A global wise of schedule status <br/>
+ * <br/>
+ * TODO/FIXME: The persistence simply deserial ScheduleState to Json. One
+ * concern is that this string might become too big for store. <br/>
+ * <br/>
+ * The solution is in metadata resource, have specs/monitoredStreams/policy
+ * assignments stored in different table/collections with tage version.
+ * 
+ * 
+ * @since Apr 26, 2016
+ *
+ */
+public class ScheduleState {
+
+    // ScheduleSpec
+    private Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
+    private Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>();
+    private Map<String, RouterSpec> groupSpecs = new HashMap<String, RouterSpec>();
+    private Map<String, PublishSpec> publishSpecs = new HashMap<String, PublishSpec>();
+
+    // ScheduleSnapshot
+    private List<VersionedPolicyDefinition> policySnapshots = new ArrayList<VersionedPolicyDefinition>();
+    private List<VersionedStreamDefinition> streamSnapshots = new ArrayList<VersionedStreamDefinition>();
+
+    // ScheduleResult
+    private List<MonitoredStream> monitoredStreams = new ArrayList<MonitoredStream>();
+    private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
+
+    private String version;
+    // FIXME : should be date, can not make it simple in mongo..
+    private String generateTime;
+    private int code = 200;
+    private String message = "OK";
+
+    public ScheduleState() {
+    }
+
+    public ScheduleState(String version, 
+            Map<String, SpoutSpec> topoSpoutSpecsMap,
+            Map<String, RouterSpec> groupSpecsMap, 
+            Map<String, AlertBoltSpec> alertSpecsMap,
+            Map<String, PublishSpec> pubMap, 
+            Collection<PolicyAssignment> assignments,
+            Collection<MonitoredStream> monitoredStreams, 
+            Collection<PolicyDefinition> definitions,
+            Collection<StreamDefinition> streams) {
+        this.spoutSpecs = topoSpoutSpecsMap;
+        this.groupSpecs = groupSpecsMap;
+        this.alertSpecs = alertSpecsMap;
+        this.publishSpecs = pubMap;
+        this.version = version;
+        this.generateTime = String.valueOf(new Date().getTime());
+        this.assignments = new ArrayList<PolicyAssignment>(assignments);
+        this.monitoredStreams = new ArrayList<MonitoredStream>(monitoredStreams);
+        this.policySnapshots = new ArrayList<VersionedPolicyDefinition>();
+        this.streamSnapshots = new ArrayList<VersionedStreamDefinition>();
+
+        for (SpoutSpec ss : this.spoutSpecs.values()) {
+            ss.setVersion(version);
+        }
+
+        for (RouterSpec ss : this.groupSpecs.values()) {
+            ss.setVersion(version);
+        }
+
+        for (AlertBoltSpec ss : this.alertSpecs.values()) {
+            ss.setVersion(version);
+        }
+
+        for (PublishSpec ps : this.publishSpecs.values()) {
+            ps.setVersion(version);
+        }
+
+        for (MonitoredStream ms : this.monitoredStreams) {
+            ms.setVersion(version);
+        }
+        for (PolicyAssignment ps : this.assignments) {
+            ps.setVersion(version);
+        }
+        for (PolicyDefinition def : definitions) {
+            this.policySnapshots.add(new VersionedPolicyDefinition(version, def));
+        }
+        for (StreamDefinition sd :streams) {
+            this.streamSnapshots.add(new VersionedStreamDefinition(version, sd));
+        }
+    }
+
+    public Map<String, SpoutSpec> getSpoutSpecs() {
+        return spoutSpecs;
+    }
+
+    public void setSpoutSpecs(Map<String, SpoutSpec> spoutSpecs) {
+        this.spoutSpecs = spoutSpecs;
+    }
+
+    public Map<String, AlertBoltSpec> getAlertSpecs() {
+        return alertSpecs;
+    }
+
+    public void setAlertSpecs(Map<String, AlertBoltSpec> alertSpecs) {
+        this.alertSpecs = alertSpecs;
+    }
+
+    public Map<String, RouterSpec> getGroupSpecs() {
+        return groupSpecs;
+    }
+
+    public void setGroupSpecs(Map<String, RouterSpec> groupSpecs) {
+        this.groupSpecs = groupSpecs;
+    }
+
+    public Map<String, PublishSpec> getPublishSpecs() {
+        return publishSpecs;
+    }
+
+    public void setPublishSpecs(Map<String, PublishSpec> publishSpecs) {
+        this.publishSpecs = publishSpecs;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+    }
+
+    public String getGenerateTime() {
+        return generateTime;
+    }
+
+    public void setGenerateTime(String generateTime) {
+        this.generateTime = generateTime;
+    }
+
+    public List<MonitoredStream> getMonitoredStreams() {
+        return monitoredStreams;
+    }
+
+    public List<PolicyAssignment> getAssignments() {
+        return assignments;
+    }
+
+    public List<VersionedPolicyDefinition> getPolicySnapshots() {
+        return policySnapshots;
+    }
+
+    public void setPolicySnapshots(List<VersionedPolicyDefinition> policySnapshots) {
+        this.policySnapshots = policySnapshots;
+    }
+
+    public void setMonitoredStreams(List<MonitoredStream> monitoredStreams) {
+        this.monitoredStreams = monitoredStreams;
+    }
+
+    public void setAssignments(List<PolicyAssignment> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<VersionedStreamDefinition> getStreamSnapshots() {
+        return streamSnapshots;
+    }
+
+    public void setStreamSnapshots(List<VersionedStreamDefinition> streamSnapshots) {
+        this.streamSnapshots = streamSnapshots;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java
new file mode 100644
index 0000000..a197858
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/SpoutSpec.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * SpoutSpec metadata control 3 phases for data transformation for one specific topic
+ * phase 1: kafka topic to tuple, controlled by Kafka2TupleMetadata, i.e. Scheme
+ * phase 2: tuple to stream, controlled by Tuple2StreamMetadata, i.e. stream name selector etc.
+ * phase 3: stream repartition, controlled by StreamRepartitionMetadata, i.e. groupby spec
+ * @since Apr 18, 2016
+ *
+ */
+public class SpoutSpec {
+    private String version;
+
+//    private String spoutId;
+    private String topologyId;
+
+    // topicName -> kafka2TupleMetadata
+    private Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap = new HashMap<String, Kafka2TupleMetadata>();
+    // topicName -> Tuple2StreamMetadata
+    private Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap = new HashMap<String, Tuple2StreamMetadata>();
+    // topicName -> list of StreamRepartitionMetadata, here it is list because one topic(data source) may spawn multiple streams.
+    private Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap = new HashMap<String, List<StreamRepartitionMetadata>>();
+
+    public SpoutSpec(){}
+
+    public SpoutSpec(
+            String topologyId,
+//            String spoutId,
+            Map<String, List<StreamRepartitionMetadata>>  streamRepartitionMetadataMap,
+            Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap,
+            Map<String, Kafka2TupleMetadata>  kafka2TupleMetadataMap) {
+        this.topologyId = topologyId;
+//        this.spoutId = spoutId;
+        this.streamRepartitionMetadataMap = streamRepartitionMetadataMap;
+        this.tuple2StreamMetadataMap = tuple2StreamMetadataMap;
+        this.kafka2TupleMetadataMap = kafka2TupleMetadataMap;
+    }
+
+//    public String getSpoutId() {
+//        return spoutId;
+//    }
+//    public void setSpoutId(String spoutId) {
+//        this.spoutId = spoutId;
+//    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public Map<String, List<StreamRepartitionMetadata>> getStreamRepartitionMetadataMap() {
+        return streamRepartitionMetadataMap;
+    }
+
+    public Map<String, Tuple2StreamMetadata> getTuple2StreamMetadataMap(){
+        return this.tuple2StreamMetadataMap;
+    }
+
+    public Map<String, Kafka2TupleMetadata> getKafka2TupleMetadataMap() {
+        return kafka2TupleMetadataMap;
+    }
+
+    @org.codehaus.jackson.annotate.JsonIgnore
+    public StreamRepartitionMetadata getStream(String streamName) {
+        for (List<StreamRepartitionMetadata> meta : this.streamRepartitionMetadataMap.values()) {
+            Optional<StreamRepartitionMetadata> m = meta.stream().filter((t) -> t.getStreamId().equalsIgnoreCase(streamName)).findFirst();
+            if (m.isPresent()) {
+                return m.get();
+            }
+        }
+        return null;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
+    }
+
+    public void setKafka2TupleMetadataMap(Map<String, Kafka2TupleMetadata> kafka2TupleMetadataMap) {
+        this.kafka2TupleMetadataMap = kafka2TupleMetadataMap;
+    }
+
+    public void setTuple2StreamMetadataMap(Map<String, Tuple2StreamMetadata> tuple2StreamMetadataMap) {
+        this.tuple2StreamMetadataMap = tuple2StreamMetadataMap;
+    }
+
+    public void setStreamRepartitionMetadataMap(Map<String, List<StreamRepartitionMetadata>> streamRepartitionMetadataMap) {
+        this.streamRepartitionMetadataMap = streamRepartitionMetadataMap;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("version:%s-topo:%s ", version, this.topologyId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java
new file mode 100644
index 0000000..d421716
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamNameSelector.java
@@ -0,0 +1,15 @@
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.Map;
+
+/**
+ * This metadata controls how to figure out stream name from incoming tuple
+ */
+public interface StreamNameSelector {
+    /**
+     * field name to value mapping
+     * @param tuple
+     * @return
+     */
+    String getStreamName(Map<String, Object> tuple);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java
new file mode 100644
index 0000000..5f8f689
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionMetadata.java
@@ -0,0 +1,53 @@
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * @since Apr 25, 2016
+ * This meta-data controls how tuple streamId is repartitioned
+ */
+public class StreamRepartitionMetadata {
+    private String topicName;
+    private String streamId;
+    /**
+     * each stream may have multiple different grouping strategies,for example groupby some fields or even shuffling
+     */
+    public List<StreamRepartitionStrategy> groupingStrategies = new ArrayList<StreamRepartitionStrategy>();
+
+    public StreamRepartitionMetadata(){}
+
+    public StreamRepartitionMetadata(String topicName, String stream) {
+        this.topicName = topicName;
+        this.streamId = stream;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+    public List<StreamRepartitionStrategy> getGroupingStrategies() {
+        return groupingStrategies;
+    }
+
+    @JsonIgnore
+    public void addGroupStrategy(StreamRepartitionStrategy gs) {
+        this.groupingStrategies.add(gs);
+    }
+
+    public void setGroupingStrategies(List<StreamRepartitionStrategy> groupingStrategies) {
+        this.groupingStrategies = groupingStrategies;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java
new file mode 100644
index 0000000..203114e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/StreamRepartitionStrategy.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordination.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+public class StreamRepartitionStrategy {
+    public StreamPartition partition ;
+
+    public int numTotalParticipatingRouterBolts = 0;      // how many group-by bolts participate policy evaluation
+    public int startSequence = 0;            // what is the sequence for the first bolt in this topology among all bolts
+    public List<String> totalTargetBoltIds = new ArrayList<String>();
+    
+    public int hashCode() {
+        int hashcode = 1 * 31;
+        hashcode += partition.hashCode();
+        for (String str : totalTargetBoltIds) {
+            hashcode += str.hashCode();
+        }
+        return hashcode;
+    }
+    
+    public boolean equals(Object obj) {
+        if (!(obj instanceof StreamRepartitionStrategy)) {
+            return false;
+        }
+        StreamRepartitionStrategy o = (StreamRepartitionStrategy) obj;
+        return partition.equals(o.partition)
+                && CollectionUtils.isEqualCollection(totalTargetBoltIds, o.totalTargetBoltIds);
+    }
+
+    public StreamPartition getPartition() {
+        return partition;
+    }
+
+    public void setPartition(StreamPartition partition) {
+        this.partition = partition;
+    }
+
+    public int getNumTotalParticipatingRouterBolts() {
+        return numTotalParticipatingRouterBolts;
+    }
+
+    public void setNumTotalParticipatingRouterBolts(int numTotalParticipatingRouterBolts) {
+        this.numTotalParticipatingRouterBolts = numTotalParticipatingRouterBolts;
+    }
+
+    public int getStartSequence() {
+        return startSequence;
+    }
+
+    public void setStartSequence(int startSequence) {
+        this.startSequence = startSequence;
+    }
+
+    public List<String> getTotalTargetBoltIds() {
+        return totalTargetBoltIds;
+    }
+
+    public void setTotalTargetBoltIds(List<String> totalTargetBoltIds) {
+        this.totalTargetBoltIds = totalTargetBoltIds;
+    }
+
+}
\ No newline at end of file



Mime
View raw message