Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 21506200C5A for ; Mon, 3 Apr 2017 13:54:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1F6EA160BCF; Mon, 3 Apr 2017 11:54:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AD99F160BAD for ; Mon, 3 Apr 2017 13:54:11 +0200 (CEST) Received: (qmail 77020 invoked by uid 500); 3 Apr 2017 11:54:10 -0000 Mailing-List: contact commits-help@eagle.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.apache.org Delivered-To: mailing list commits@eagle.apache.org Received: (qmail 76544 invoked by uid 99); 3 Apr 2017 11:54:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Apr 2017 11:54:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3C5E9DFF71; Mon, 3 Apr 2017 11:54:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hao@apache.org To: commits@eagle.apache.org Date: Mon, 03 Apr 2017 11:54:21 -0000 Message-Id: <2c7b54ec09934b30ab2190c432e3140e@git.apache.org> In-Reply-To: <04280246f21e4dcd9fbfe899c4344da2@git.apache.org> References: <04280246f21e4dcd9fbfe899c4344da2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/84] [partial] eagle git commit: Clean repo for eagle site archived-at: Mon, 03 Apr 2017 11:54:14 -0000 http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java deleted file mode 100644 index d6d4f22..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowBenchmarkTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import com.codahale.metrics.ConsoleReporter; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.jvm.GarbageCollectorMetricSet; -import com.codahale.metrics.jvm.MemoryUsageGaugeSet; -import org.apache.commons.lang.time.StopWatch; -import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.common.DateTimeUtil; - -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - -@Ignore("Ignore automatic heavy benchmark test") -public class StreamWindowBenchmarkTest { - private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowBenchmarkTest.class); - - public void sendDESCOrderedEventsToWindow(StreamWindow window, StreamWindowRepository.StorageType storageType, int num) { - LOGGER.info("Sending {} events to {} ({})", num, window.getClass().getSimpleName(), storageType); - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - int i = 0; - while (i < num) { - PartitionedEvent event = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream_1", (window.startTime() + i)); - window.add(event); - i++; - } - stopWatch.stop(); - performanceReport.put(num + "\tInsertTime\t" + storageType, stopWatch.getTime()); - LOGGER.info("Inserted {} events in {} ms", num, stopWatch.getTime()); - stopWatch.reset(); - stopWatch.start(); - window.flush(); - stopWatch.stop(); - performanceReport.put(num + "\tReadTime\t" + storageType, stopWatch.getTime()); - } - - private ScheduledReporter metricReporter; - private Map performanceReport; - - @Before - public void setUp() { - final MetricRegistry metrics = new MetricRegistry(); - metrics.registerAll(new MemoryUsageGaugeSet()); - metrics.registerAll(new GarbageCollectorMetricSet()); - metricReporter = ConsoleReporter.forRegistry(metrics) - .filter((name, metric) -> name.matches("(.*heap|total).(usage|used)")) -// .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - metricReporter.start(60, TimeUnit.SECONDS); - performanceReport = new TreeMap<>(); - } - - @After - public void after() { - StringBuilder sb = new StringBuilder(); - for (Map.Entry entry : performanceReport.entrySet()) { - sb.append(String.format("%-40s\t%s\n", entry.getKey(), entry.getValue())); - } - LOGGER.info("\n===== Benchmark Result Report =====\n\n{}", sb.toString()); - } - - private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"); - private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-05 00:00:00,000"); - private final long margin = (stop - start) / 3; - - private void benchmarkTest(StreamWindow window, StreamWindowRepository.StorageType storageType) { - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - LOGGER.info("\n===== Benchmark Test for {} ({}) =====", window.getClass().getSimpleName(), storageType); - metricReporter.report(); - sendDESCOrderedEventsToWindow(window, storageType, 1000); - metricReporter.report(); - sendDESCOrderedEventsToWindow(window, storageType, 10000); - metricReporter.report(); - sendDESCOrderedEventsToWindow(window, storageType, 100000); - metricReporter.report(); - sendDESCOrderedEventsToWindow(window, storageType, 1000000); - metricReporter.report(); - stopWatch.stop(); - LOGGER.info("\n===== Finished in total {} ms =====\n", stopWatch.getTime()); - } - - @Test - @Ignore - public void testStreamWindowBenchmarkMain() { - testStreamSortedWindowOnHeap(); - testStreamSortedWindowInSerializedMemory(); - testStreamSortedWindowOffHeap(); - testStreamSortedWindowFile(); - } - - @Test - @Ignore - public void testStreamSortedWindowOnHeap() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP); - benchmarkTest(window, StreamWindowRepository.StorageType.ONHEAP); - window.close(); - } - - @Test - @Ignore - public void testStreamSortedWindowInSerializedMemory() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY); - benchmarkTest(window, StreamWindowRepository.StorageType.MEMORY); - window.close(); - } - - @Test - @Ignore - public void testStreamSortedWindowOffHeap() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY); - benchmarkTest(window, StreamWindowRepository.StorageType.DIRECT_MEMORY); - window.close(); - } - - @Test - @Ignore - public void testStreamSortedWindowFile() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF); - benchmarkTest(window, StreamWindowRepository.StorageType.FILE_RAF); - window.close(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java deleted file mode 100644 index 57e577c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamWindowTestSuite.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import com.google.common.collect.Ordering; -import org.apache.eagle.alert.engine.mock.MockPartitionedCollector; -import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator; -import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory; -import org.apache.eagle.common.DateTimeUtil; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -@SuppressWarnings("unused") -public class StreamWindowTestSuite { - private final static Logger LOGGER = LoggerFactory.getLogger(StreamWindowTestSuite.class); - - private final long start = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"); - private final long stop = DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"); - private final long margin = (stop - start) / 3; - - @Test - public void testStreamSortedWindowOnHeap() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.ONHEAP); - streamSortedWindowMustTest(window); - } - - @Test - public void testStreamSortedWindowInSerializedMemory() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.MEMORY); - streamSortedWindowMustTest(window); - } - - @Test - public void testStreamSortedWindowOffHeap() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.DIRECT_MEMORY); - streamSortedWindowMustTest(window); - } - - @Test - public void testStreamSortedWindowFile() { - StreamWindow window = StreamWindowRepository.getSingletonInstance().createWindow(start, stop, margin, StreamWindowRepository.StorageType.FILE_RAF); - streamSortedWindowMustTest(window); - } - - @SuppressWarnings( {"unchecked", "rawtypes"}) - private void streamSortedWindowMustTest(StreamWindow window) { - MockPartitionedCollector collector = new MockPartitionedCollector(); - window.register(collector); - - StreamTimeClock clock = new StreamTimeClockInLocalMemory("sampleStream_1"); - clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000")); - - // Current time is: "2016-05-04 00:00:30" - window.onTick(clock, System.currentTimeMillis()); - - Assert.assertTrue(window.alive()); - Assert.assertFalse(window.expired()); - - // Accepted - Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"))); - Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"))); - Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"))); - Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"))); - Assert.assertTrue(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"))); - - - // Rejected - Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000"))); - Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000"))); - Assert.assertFalse(window.accept(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000"))); - - // Accepted - Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000")))); - - Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000")))); - Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000")))); - Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000")))); - Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000")))); - - // Should accept Duplicated - Assert.assertTrue("Should support duplicated timestamp", window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000")))); - - Assert.assertEquals(6, window.size()); - - // Rejected - Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-03 23:59:59,000")))); - Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:00,000")))); - Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:01,000")))); - - Assert.assertEquals(6, window.size()); - - // Now is: "2016-05-04 00:00:55" - clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:55,000")); - window.onTick(clock, System.currentTimeMillis()); - Assert.assertTrue(window.alive()); - Assert.assertFalse(window.expired()); - Assert.assertTrue(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000")))); - Assert.assertEquals(7, window.size()); - - // Flush when stream time delay too much after system time but window will still be alive - window.onTick(clock, System.currentTimeMillis() + 1 + stop - start + margin); - Assert.assertTrue(window.alive()); - Assert.assertFalse(window.expired()); - Assert.assertEquals(0, window.size()); - Assert.assertEquals(7, collector.size()); - - Assert.assertFalse("Because window has flushed but not expired, window should reject future events < last flush stream time", - window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:54,000")))); - Assert.assertTrue("Because window has flushed but not expired, window should still accept future events >= last flush stream time", - window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000")))); - Assert.assertEquals(1, window.size()); - Assert.assertEquals(7, collector.size()); - - // Now is: "2016-05-04 00:01:10", not expire, - clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:10,000")); - window.onTick(clock, System.currentTimeMillis() + 2 * (1 + stop - start + margin)); - Assert.assertEquals(8, collector.size()); - - // Now is: "2016-05-04 00:01:20", expire - clock.moveForward(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:01:20,000")); - window.onTick(clock, System.currentTimeMillis()); - Assert.assertFalse(window.alive()); - Assert.assertTrue(window.expired()); - Assert.assertFalse(window.add(MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000")))); - Assert.assertEquals(0, window.size()); - - Assert.assertEquals(8, collector.size()); - - Ordering ordering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE); - Assert.assertTrue(ordering.isOrdered(collector.get())); - - List list = collector.get(); - Assert.assertEquals(8, list.size()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(0).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:00,000"), list.get(1).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:01,000"), list.get(2).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:30,000"), list.get(3).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:35,000"), list.get(4).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:40,000"), list.get(5).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:50,000"), list.get(6).getTimestamp()); - Assert.assertEquals(DateTimeUtil.humanDateToMillisecondsWithoutException("2016-05-04 00:00:56,000"), list.get(7).getTimestamp()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java deleted file mode 100644 index c59f0de..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/TreeMultisetComparatorTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import com.google.common.collect.TreeMultiset; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Iterator; - -/** - * Since 5/10/16. - */ -public class TreeMultisetComparatorTest { - /** - * if 2 different events have the same timestamp, and comparator return 0 when timestamp is same, - * when they are added into TreeMultiset, the second event will be replaced by the first event - */ - @Test - public void testComparator() { - TreeMultiset set = TreeMultiset.create(PartitionedEventTimeOrderingComparator.INSTANCE); - - // construct PartitionEvent1 - PartitionedEvent event1 = new PartitionedEvent(); - StreamPartition sp = new StreamPartition(); - sp.setColumns(Arrays.asList("host")); - sp.setSortSpec(null); - sp.setStreamId("testStreamId"); - sp.setType(StreamPartition.Type.GROUPBY); - event1.setPartition(sp); - event1.setPartitionKey(1000); - StreamEvent e1 = new StreamEvent(); - e1.setData(new Object[] {18.4}); - e1.setStreamId("testStreamId"); - e1.setTimestamp(1462909984000L); - event1.setEvent(e1); - - // construct PartitionEvent2 with same timestamp but different value - PartitionedEvent event2 = new PartitionedEvent(); - event2.setPartition(sp); - event2.setPartitionKey(1000); - StreamEvent e2 = new StreamEvent(); - e2.setData(new Object[] {16.3}); - e2.setStreamId("testStreamId"); - e2.setTimestamp(1462909984000L); - event2.setEvent(e2); - - // construct PartitionEvent2 with same timestamp but different value - PartitionedEvent event3 = new PartitionedEvent(); - event3.setPartition(sp); - event3.setPartitionKey(1000); - StreamEvent e3 = new StreamEvent(); - e3.setData(new Object[] {14.3}); - e3.setStreamId("testStreamId"); - e3.setTimestamp(1462909984001L); - event3.setEvent(e3); - - PartitionedEvent event4 = new PartitionedEvent(); - event4.setPartition(sp); - event4.setPartitionKey(1000); - StreamEvent e4 = new StreamEvent(); - e4.setData(new Object[] {14.3}); - e4.setStreamId("testStreamId"); - e4.setTimestamp(1462909984001L); - event4.setEvent(e4); - - Assert.assertNotEquals(event2, event3); - Assert.assertEquals(event3, event4); - - // check content in set - set.add(event1); - set.add(event2); - set.add(event3); - set.add(event4); - Assert.assertEquals(4, set.size()); - set.forEach(System.out::println); - - - Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event2)); - Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event1, event3)); - Assert.assertEquals(-1, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event2, event3)); - Assert.assertEquals(0, PartitionedEventTimeOrderingComparator.INSTANCE.compare(event3, event4)); - - Iterator it = set.iterator(); - Assert.assertEquals(16.3, it.next().getData()[0]); - Assert.assertEquals(18.4, it.next().getData()[0]); - Assert.assertEquals(14.3, it.next().getData()[0]); - Assert.assertEquals(14.3, it.next().getData()[0]); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java deleted file mode 100644 index d4fe01a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.statecheck; - -import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.router.TestAlertBolt; -import org.apache.eagle.alert.engine.runner.AlertBolt; -import org.apache.eagle.alert.utils.AlertConstants; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.type.TypeReference; -import org.jetbrains.annotations.NotNull; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -/** - * Created on 8/4/16. - */ -public class TestStateCheckPolicy { - - @Test - public void testStateCheck() throws Exception { - PolicyGroupEvaluatorImpl impl = new PolicyGroupEvaluatorImpl("test-statecheck-poicyevaluator"); - AtomicBoolean verified = new AtomicBoolean(false); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - @Override - public List emit(String streamId, Collection anchors, List tuple) { - verified.set(true); - Assert.assertEquals("perfmon_latency_check_output2", ((PublishPartition) tuple.get(0)).getStreamId()); - AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple)); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - @Override - public void reportError(Throwable error) { - } - }); - - AlertBolt alertBolt = TestAlertBolt.createAlertBolt(collector); - AlertBoltSpec spec = createAlertSpec(); - Map definitionMap = createStreamMap(); - - - List policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"), - new TypeReference>() { - }); - List streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"), - new TypeReference>() { - }); - spec.addPublishPartition("perfmon_latency_check_output2", policies.get(0).getName(), "testPublishBolt", null); - - alertBolt.onAlertBoltSpecChange(spec, definitionMap); - - // send data now - sendData(alertBolt, definitionMap, spec.getBoltPoliciesMap().values().iterator().next().get(0)); - - Thread.sleep(3000); - Assert.assertTrue(verified.get()); - } - - private void sendData(AlertBolt alertBolt, Map definitionMap, PolicyDefinition policyDefinition) { - StreamDefinition definition = definitionMap.get("perfmon_latency_stream"); - long base = System.currentTimeMillis(); - for (int i = 0; i < 2; i++) { - long time = base + i * 1000; - - Map mapdata = new HashMap<>(); - mapdata.put("host", "host-1"); - mapdata.put("timestamp", time); - mapdata.put("metric", "perfmon_latency"); - mapdata.put("pool", "raptor"); - mapdata.put("value", 1000.0 + i * 1000.0); - mapdata.put("colo", "phx"); - - StreamEvent event = StreamEvent.builder().timestamep(time).attributes(mapdata, definition).build(); - PartitionedEvent pEvent = new PartitionedEvent(event, policyDefinition.getPartitionSpec().get(0), 1); - - GeneralTopologyContext mock = Mockito.mock(GeneralTopologyContext.class); - - Mockito.when(mock.getComponentId(1)).thenReturn("taskId"); - Mockito.when(mock.getComponentOutputFields("taskId", "test-stream-id")).thenReturn(new Fields(AlertConstants.FIELD_0)); - - TupleImpl ti = new TupleImpl(mock, Collections.singletonList(pEvent), 1, "test-stream-id"); - alertBolt.execute(ti); - } - } - - @NotNull - private Map createStreamMap() throws Exception { - List streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"), - new TypeReference>() { - }); - return streams.stream().collect(Collectors.toMap(StreamDefinition::getStreamId, item -> item)); - } - - private static ObjectMapper mapper = new ObjectMapper(); - - static { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private AlertBoltSpec createAlertSpec() throws Exception { - AlertBoltSpec spec = new AlertBoltSpec(); - - spec.setVersion("version1"); - spec.setTopologyName("testTopology"); - - List policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"), - new TypeReference>() { - }); - Assert.assertTrue(policies.size() > 0); - spec.addBoltPolicy("alertBolt1", policies.get(0).getName()); - spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(policies)); - - return spec; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java deleted file mode 100644 index 7212785..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.topology; - -import backtype.storm.LocalCluster; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.SpoutDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.spout.CorrelationSpout; -import org.apache.eagle.alert.engine.spout.CreateTopicUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Properties; - -@SuppressWarnings( {"serial", "unused"}) -public class AlertTopologyTest implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(AlertTopologyTest.class); - char[] alphabets = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'}; - - @Ignore - @Test - public void testMultipleTopics() throws Exception { - final String topoId = "myTopology"; - int numGroupbyBolts = 2; - int numTotalGroupbyBolts = 3; - System.setProperty("eagle.correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts)); - System.setProperty("eagle.correlation.topologyName", topoId); - System.setProperty("eagle.correlation.mode", "local"); - System.setProperty("eagle.correlation.zkHosts", "localhost:2181"); - final String topicName1 = "testTopic3"; - final String topicName2 = "testTopic4"; - // ensure topic ready - LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG); - Config config = ConfigFactory.load(); - - CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName1); - CreateTopicUtils.ensureTopicReady(System.getProperty("eagle.correlation.zkHosts"), topicName2); - - TopologyBuilder topoBuilder = new TopologyBuilder(); - - int numBolts = config.getInt("eagle.correlation.numGroupbyBolts"); - CorrelationSpout spout = new CorrelationSpout(config, topoId, null, numBolts); - String spoutId = "correlation-spout"; - SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout); - for (int i = 0; i < numBolts; i++) { - TestBolt bolt = new TestBolt(); - BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt); - boltDecl.fieldsGrouping(spoutId, "stream_" + i, new Fields()); - } - - String topoName = config.getString("eagle.correlation.topologyName"); - LOG.info("start topology in local mode"); - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topoName, new HashMap<>(), topoBuilder.createTopology()); - - while (true) { - try { - Thread.sleep(1000); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - @SuppressWarnings( {"rawtypes", "unchecked"}) - @Ignore - @Test - public void generateRandomStringsToKafka() { - String topic = "testTopic3"; - int max = 1000; - Properties configMap = new Properties(); - configMap.put("bootstrap.servers", "localhost:6667"); - configMap.put("metadata.broker.list", "localhost:6667"); - configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - configMap.put("request.required.acks", "1"); - configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - KafkaProducer producer = new KafkaProducer<>(configMap); - - int i = 0; - while (i++ < max) { - String randomString = generateRandomString(); - System.out.println("sending string : " + randomString); - ProducerRecord record = new ProducerRecord(topic, randomString); - producer.send(record); - if (i % 10 == 0) { - try { - Thread.sleep(10); - } catch (Exception ex) { - } - } - } - producer.close(); - } - - private String generateRandomString() { - long count = Math.round(Math.random() * 10); - if (count == 0) { - count = 1; - } - StringBuilder sb = new StringBuilder(); - while (count-- > 0) { - int index = (int) (Math.floor(Math.random() * 26)); - sb.append(alphabets[index]); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java deleted file mode 100644 index d011770..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CoordinatorSpoutIntegrationTest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.topology; - -import backtype.storm.LocalCluster; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.SpoutDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.spout.CorrelationSpout; -import org.apache.eagle.alert.engine.spout.CreateTopicUtils; -import org.apache.eagle.alert.utils.AlertConstants; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.HashMap; - -/** - * Since 4/28/16. - */ -@SuppressWarnings( {"serial", "unused", "rawtypes"}) -public class CoordinatorSpoutIntegrationTest implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(CoordinatorSpoutIntegrationTest.class); - - @Ignore // this test need zookeeper - @Test - public void testConfigNotify() throws Exception { - final String topoId = "myTopology"; - int numGroupbyBolts = 2; - int numTotalGroupbyBolts = 3; - System.setProperty("correlation.serviceHost", "sandbox.hortonworks.com"); - System.setProperty("correlation.servicePort", "58080"); - System.setProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum", "sandbox.hortonworks.com:2181"); - System.setProperty("correlation.numGroupbyBolts", String.valueOf(numGroupbyBolts)); - System.setProperty("correlation.topologyName", topoId); - System.setProperty("correlation.mode", "local"); - System.setProperty("correlation.zkHosts", "sandbox.hortonworks.com:2181"); - final String topicName1 = "testTopic3"; - final String topicName2 = "testTopic4"; - // ensure topic ready - LogManager.getLogger(CorrelationSpout.class).setLevel(Level.DEBUG); - Config config = ConfigFactory.load(); - - CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName1); - CreateTopicUtils.ensureTopicReady(System.getProperty("withMetadataChangeNotifyService.zkConfig.zkQuorum"), topicName2); - - TopologyBuilder topoBuilder = new TopologyBuilder(); - - int numBolts = config.getInt("correlation.numGroupbyBolts"); - String spoutId = "correlation-spout"; - CorrelationSpout spout = new CorrelationSpout(config, topoId, - new MockMetadataChangeNotifyService(topoId, spoutId), numBolts); - SpoutDeclarer declarer = topoBuilder.setSpout(spoutId, spout); - declarer.setNumTasks(2); - for (int i = 0; i < numBolts; i++) { - TestBolt bolt = new TestBolt(); - BoltDeclarer boltDecl = topoBuilder.setBolt("engineBolt" + i, bolt); - boltDecl.fieldsGrouping(spoutId, - StreamIdConversion.generateStreamIdBetween(AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME + i), new Fields()); - } - - String topoName = config.getString("correlation.topologyName"); - LOG.info("start topology in local mode"); - LocalCluster cluster = new LocalCluster(); - StormTopology topology = topoBuilder.createTopology(); - cluster.submitTopology(topoName, new HashMap(), topology); - - - Utils.sleep(Long.MAX_VALUE); - } - - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java deleted file mode 100644 index 5a86cd2..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.topology; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata; -import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.spout.CorrelationSpout; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import storm.kafka.KafkaSpoutWrapper; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -@Ignore -public class CorrelationSpoutTest { - @SuppressWarnings("unused") - private static final String TEST_SPOUT_ID = "spout-id-1"; - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CorrelationSpoutTest.class); - private String dataSourceName = "ds-name"; - - @SuppressWarnings( {"serial", "rawtypes"}) - @Test - public void testMetadataInjestion_emptyMetadata() throws Exception { - String topoId = "testMetadataInjection"; - Config config = ConfigFactory.load(); - AtomicBoolean validated = new AtomicBoolean(false); - CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { - @Override - protected KafkaSpoutWrapper createKafkaSpout(Config config, Map conf, TopologyContext context, - SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map sds) - throws Exception { - validated.set(true); - return null; - } - }; - Kafka2TupleMetadata ds = new Kafka2TupleMetadata(); - ds.setName("ds-name"); - ds.setType("KAFKA"); - ds.setProperties(new HashMap()); - ds.setTopic("name-of-topic1"); - ds.setSchemeCls("PlainStringScheme"); - ds.setCodec(new Tuple2StreamMetadata()); - Map dsMap = new HashMap(); - dsMap.put(ds.getName(), ds); - - StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(ds.getName(), "s1"); - - Map> dataSources = new HashMap>(); - dataSources.put(ds.getName(), Arrays.asList(m1)); - - SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap); - - spout.onReload(newMetadata, null); - Assert.assertTrue(validated.get()); - } - - @SuppressWarnings( {"serial", "rawtypes"}) - @Test - public void testMetadataInjestion_oneNewTopic2Streams() throws Exception { - String topoId = "testMetadataInjection"; - final String topicName = "testTopic"; - - Config config = ConfigFactory.load(); - final AtomicBoolean verified = new AtomicBoolean(false); - CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { - @Override - protected KafkaSpoutWrapper createKafkaSpout(Config config, - Map conf, - TopologyContext context, - SpoutOutputCollector collector, - String topic, - String topic2SchemeClsName, - SpoutSpec streamMetadatas, - Map sds) { - Assert.assertEquals(1, streamMetadatas.getStreamRepartitionMetadataMap().size()); - Assert.assertTrue(streamMetadatas.getStream("s1") != null); - Assert.assertTrue(streamMetadatas.getStream("s2") != null); - Assert.assertEquals(topicName, streamMetadatas.getStream("s1").getTopicName()); - Assert.assertEquals(topicName, streamMetadatas.getStream("s2").getTopicName()); - LOG.info("successfully verified new topic and streams"); - verified.set(true); - return null; - } - }; - - Map dsMap = createDatasource(topicName, dataSourceName); - - StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1"); - StreamRepartitionMetadata m2 = new StreamRepartitionMetadata(dataSourceName, "s2"); - Map> dataSources = new HashMap>(); - dataSources.put(dataSourceName, Arrays.asList(m1, m2)); - - SpoutSpec newMetadata = new SpoutSpec(topoId, dataSources, null, dsMap); - spout.onReload(newMetadata, null); - Assert.assertTrue(verified.get()); - } - - private Map createDatasource(final String topicName, final String dataSourceName) { - Kafka2TupleMetadata ds = new Kafka2TupleMetadata(); - - ds.setName(dataSourceName); - ds.setType("KAFKA"); - ds.setProperties(new HashMap()); - ds.setTopic(topicName); - ds.setSchemeCls("PlainStringScheme"); - ds.setCodec(new Tuple2StreamMetadata()); - Map dsMap = new HashMap(); - dsMap.put(ds.getName(), ds); - return dsMap; - } - - @SuppressWarnings( {"serial", "rawtypes"}) - @Test - public void testMetadataInjestion_deleteOneTopic() throws Exception { - String topoId = "testMetadataInjection"; - final String topicName = "testTopic"; - Config config = ConfigFactory.load(); - final AtomicBoolean verified = new AtomicBoolean(false); - CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { - @Override - protected KafkaSpoutWrapper createKafkaSpout(Config config, - Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, - String schemeClsName, SpoutSpec streamMetadatas, - Map sds) { - return new KafkaSpoutWrapper(null, null); - } - - @Override - protected void removeKafkaSpout(KafkaSpoutWrapper wrapper) { - LOG.info("successfully verified removed topic and streams"); - verified.set(true); - } - }; - - Map dsMap = createDatasource(topicName, dataSourceName); - - StreamRepartitionMetadata m1 = new StreamRepartitionMetadata(dataSourceName, "s1"); - - Map> streamMetadatas = new HashMap>(); - streamMetadatas.put(dataSourceName, Arrays.asList(m1)); - - SpoutSpec cachedMetadata = new SpoutSpec(topoId, streamMetadatas, null, dsMap); - // add new topic - spout.onReload(cachedMetadata, null); - // delete new topic - try { - spout.onReload(new SpoutSpec(topoId, new HashMap>(), new HashMap<>(), new HashMap()), - null); - } catch (Exception ex) { - LOG.error("error reloading spout metadata", ex); - throw ex; - } - Assert.assertTrue(verified.get()); - } - -// @Ignore -// @SuppressWarnings("rawtypes") -// @Test -// public void testSpout() { -// String topoId = "testMetadataInjection"; -// final AtomicBoolean verified = new AtomicBoolean(false); -// Config config = ConfigFactory.load(); -// CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1); -// -// TopologyBuilder builder = new TopologyBuilder(); -// // only one spout -// builder.setSpout("cc-spout", spout); -// builder.setBolt("recv-bolt", new RecvBolt()).globalGrouping("cc-spout"); -// -// StormTopology topology = builder.createTopology(); -// LocalCluster cluster = new LocalCluster(); -// cluster.submitTopology(topoId, new HashMap(), topology); -// -// while (true) { -// try { -// Thread.sleep(1000); -// } catch (Exception e) { -// e.printStackTrace(); -// } -// } -// } -// -// @SuppressWarnings("serial") -// private static class RecvBolt extends BaseRichBolt { -// -// @Override -// public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { -// } -// -// @Override -// public void execute(Tuple input) { -// -// } -// -// @Override -// public void declareOutputFields(OutputFieldsDeclarer declarer) { -// } -// -// } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java deleted file mode 100644 index 5aff5c8..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/FastWordCountTopology.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.topology; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.generated.*; -import backtype.storm.metric.LoggingMetricsConsumer; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - -/** - * WordCount but the spout does not stop, and the bolts are implemented in - * java. This can show how fast the word count can run. - */ -@SuppressWarnings("serial") -public class FastWordCountTopology { - public static class FastRandomSentenceSpout extends BaseRichSpout { - private final static Logger LOG = LoggerFactory.getLogger(FastRandomSentenceSpout.class); - SpoutOutputCollector _collector; - Random _rand; - private static final String[] CHOICES = { - "marry had a little lamb whos fleese was white as snow", - "and every where that marry went the lamb was sure to go", - "one two three four five six seven eight nine ten", - "this is a test of the emergency broadcast system this is only a test", - "peter piper picked a peck of pickeled peppers" - }; - - @SuppressWarnings("rawtypes") - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - _collector = collector; - _rand = ThreadLocalRandom.current(); - } - - @Override - public void nextTuple() { - String sentence = CHOICES[_rand.nextInt(CHOICES.length)]; - _collector.emit(new Values(sentence), sentence); - LOG.debug("Emit tuple: {}, id:{}", new Values(sentence), sentence); - } - - @Override - public void ack(Object id) { - //Ignored - } - - @Override - public void fail(Object id) { - _collector.emit(new Values(id), id); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("sentence")); - } - } - - public static class SplitSentence extends BaseBasicBolt { - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String sentence = tuple.getString(0); - for (String word : sentence.split("\\s+")) { - collector.emit(new Values(word, 1)); - } - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - - public static class WordCount extends BaseBasicBolt { - Map counts = new HashMap(); - - @Override - public void execute(Tuple tuple, BasicOutputCollector collector) { - String word = tuple.getString(0); - Integer count = counts.get(word); - if (count == null) { - count = 0; - } - count++; - counts.put(word, count); - collector.emit(new Values(word, count)); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word", "count")); - } - } - - public static void printMetrics(Nimbus.Client client, String name) throws Exception { - ClusterSummary summary = client.getClusterInfo(); - String id = null; - for (TopologySummary ts : summary.get_topologies()) { - if (name.equals(ts.get_name())) { - id = ts.get_id(); - } - } - if (id == null) { - throw new Exception("Could not find a topology named " + name); - } - TopologyInfo info = client.getTopologyInfo(id); - int uptime = info.get_uptime_secs(); - long acked = 0; - long failed = 0; - double weightedAvgTotal = 0.0; - for (ExecutorSummary exec : info.get_executors()) { - if ("spout".equals(exec.get_component_id())) { - SpoutStats stats = exec.get_stats().get_specific().get_spout(); - Map failedMap = stats.get_failed().get(":all-time"); - Map ackedMap = stats.get_acked().get(":all-time"); - Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); - for (String key : ackedMap.keySet()) { - if (failedMap != null) { - Long tmp = failedMap.get(key); - if (tmp != null) { - failed += tmp; - } - } - long ackVal = ackedMap.get(key); - double latVal = avgLatMap.get(key) * ackVal; - acked += ackVal; - weightedAvgTotal += latVal; - } - } - } - double avgLatency = weightedAvgTotal / acked; - System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " + (((double) acked) / uptime + " failed: " + failed)); - } - - public static void kill(Nimbus.Client client, String name) throws Exception { - KillOptions opts = new KillOptions(); - opts.set_wait_secs(0); - client.killTopologyWithOpts(name, opts); - } - - public static void main(String[] args) throws Exception { - - TopologyBuilder builder = new TopologyBuilder(); - - builder.setSpout("spout", new FastRandomSentenceSpout(), 4); - - builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); - builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); - - Config conf = new Config(); - conf.registerMetricsConsumer(LoggingMetricsConsumer.class); - - String name = "wc-test"; - if (args != null && args.length > 0) { - name = args[0]; - } - -// conf.setNumWorkers(1); - - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(name, conf, builder.createTopology()); - - Utils.sleep(Long.MAX_VALUE); -// -// Map clusterConf = Utils.readStormConfig(); -// clusterConf.putAll(Utils.readCommandLineOpts()); -// Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); -// -// //Sleep for 5 mins -// for (int i = 0; i < 10; i++) { -// Thread.sleep(30 * 1000); -// printMetrics(client, name); -// } -// kill(client, name); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java deleted file mode 100644 index b7d32ab..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/MockMetadataChangeNotifyService.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.topology; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.coordination.model.RouterSpec; -import org.apache.eagle.alert.coordination.model.SpoutSpec; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.utils.MetadataSerDeser; -import com.fasterxml.jackson.core.type.TypeReference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * Since 5/4/16. - */ -@SuppressWarnings( {"serial"}) -public class MockMetadataChangeNotifyService extends AbstractMetadataChangeNotifyService implements Runnable { - private final static Logger LOG = LoggerFactory.getLogger(MockMetadataChangeNotifyService.class); - @SuppressWarnings("unused") - private static final String[] topics = new String[] {"testTopic3", "testTopic4", "testTopic5"}; - @SuppressWarnings("unused") - private String topologyName; - @SuppressWarnings("unused") - private String spoutId; - private Map sds; - - public MockMetadataChangeNotifyService(String topologyName, String spoutId) { - this.topologyName = topologyName; - this.spoutId = spoutId; - } - - @Override - public void init(Config config, MetadataType type) { - super.init(config, type); - this.sds = defineStreamDefinitions(); - new Thread(this).start(); - } - - @Override - public void run() { - switch (type) { - case SPOUT: - notifySpout(Arrays.asList("testTopic3", "testTopic4"), Arrays.asList("testTopic5")); - break; - case STREAM_ROUTER_BOLT: - populateRouterMetadata(); - break; - case ALERT_BOLT: - populateAlertBoltSpec(); - break; - case ALERT_PUBLISH_BOLT: - notifyAlertPublishBolt(); - break; - default: - LOG.error("that is not possible man!"); - } - } - - private Map defineStreamDefinitions() { - Map sds = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamDefinitionsSpec.json"), - new TypeReference>() { - }); - return sds; - } - - private void notifySpout(List plainStringTopics, List jsonStringTopics) { - SpoutSpec newSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testSpoutSpec.json"), SpoutSpec.class); - notifySpout(newSpec, sds); - } - - private void populateRouterMetadata() { - RouterSpec boltSpec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testStreamRouterBoltSpec.json"), RouterSpec.class); - notifyStreamRouterBolt(boltSpec, sds); - } - - private void populateAlertBoltSpec() { - AlertBoltSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testAlertBoltSpec.json"), AlertBoltSpec.class); - notifyAlertBolt(spec, sds); - } - - private void notifyAlertPublishBolt() { - PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class); - notifyAlertPublishBolt(spec, sds); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java deleted file mode 100644 index 0ade06a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/SendData2KafkaTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.topology; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.Ignore; -import org.junit.Test; - -import java.io.FileWriter; -import java.io.PrintWriter; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; -import java.util.Properties; - -/** - * Created on 3/12/16. - */ -@SuppressWarnings( {"serial", "unchecked", "rawtypes", "resource"}) -public class SendData2KafkaTest implements Serializable { - /** - * {"timestamp": 10000, "metric": "esErrorLogEvent", "instanceUuid": "vm-InstanceId1", "host":"test-host1", "type":"nova", "stack":"NullPointException-..........."} - * {"timestamp": 10000, "metric": "instanceFailureLogEvent", "instanceUuid": "vm-InstanceId1", "message":"instance boot failure for user liasu!"} - */ - @Test - @Ignore - public void testOutput() throws Exception { - String s1 = "{\"timestamp\": %d, \"metric\": \"esErrorLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"host\":\"test-host1\", \"type\":\"nova\", \"stack\":\"NullPointException-...........\"}"; - String s2 = "{\"timestamp\": %d, \"metric\": \"instanceFailureLogEvent\", \"instanceUuid\": \"vm-InstanceId%d\", \"message\":\"instance boot failure for user liasu!\"}"; - - PrintWriter espw = new PrintWriter(new FileWriter("src/test/resources/es.log")); - PrintWriter ifpw = new PrintWriter(new FileWriter("src/test/resources/if.log")); - - long base = System.currentTimeMillis(); - long timestamp = 10000; - - for (int i = 0; i < 10; i++) { - - timestamp = base + i * 10000; - for (int j = 0; j < 10; j++) { - timestamp = timestamp + j * 1000; - - espw.println(String.format(s1, timestamp, i)); - } - - ifpw.println(String.format(s2, timestamp, i)); - } - - espw.flush(); - ifpw.flush(); - } - - @Test - @Ignore - public void sendKakfa() throws Exception { - List eslogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/es.log").toURI()), Charset.defaultCharset()); - List iflogs = Files.readAllLines(Paths.get(SendData2KafkaTest.class.getResource("/if.log").toURI()), Charset.defaultCharset()); - - Properties props = new Properties(); - props.put("bootstrap.servers", "localhost:9092"); - props.put("value.serializer", StringSerializer.class.getCanonicalName()); - props.put("key.serializer", StringSerializer.class.getCanonicalName()); - - KafkaProducer producer = new KafkaProducer<>(props); - while (true) { - for (String s : eslogs) { - ProducerRecord record = new ProducerRecord<>("nn_jmx_metric_sandbox", s); - producer.send(record); - } - - for (String s : iflogs) { - ProducerRecord record = new ProducerRecord<>("nn_jmx_metric_sandbox", s); - producer.send(record); - } - - Thread.sleep(5000); - } - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java deleted file mode 100644 index dc9c9b3..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.topology; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -@Ignore -@SuppressWarnings( {"rawtypes", "serial"}) -public class TestBolt extends BaseRichBolt { - private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class); - private OutputCollector collector; - private long count; - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - LOG.info("data is coming: " + input); - count++; - if (count % 10 == 0) { - LOG.info("count = " + count); - } - collector.ack(input); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java deleted file mode 100644 index 29986f1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestByteBuffer.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.topology; - -import org.junit.Assert; -import org.junit.Test; - -import java.nio.ByteBuffer; - -/** - * Storm 1.0.0 uses ByteBuffer, we need test it - */ -public class TestByteBuffer { - @Test - public void testBB() { - ByteBuffer bb = ByteBuffer.allocate(100); - bb.put((byte) 12); - Assert.assertTrue(bb.hasArray()); - bb.rewind(); - Assert.assertEquals(12, bb.get()); - } - - @Test - public void testMultipleStrings() throws Exception { - ByteBuffer bb = ByteBuffer.allocate(100); - bb.put("abc".getBytes("UTF-8")); - bb.put("xyz".getBytes("UTF-8")); - bb.put("1234".getBytes("UTF-8")); - bb.rewind(); - if (bb.hasArray()) { - int base = bb.arrayOffset(); - String ret = new String(bb.array(), base + bb.position(), bb.remaining()); - System.out.println("string is: " + ret); - } - } -}