From commits-return-2835-archive-asf-public=cust-asf.ponee.io@metron.apache.org Wed Apr 18 16:59:40 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3D4D61807C5 for ; Wed, 18 Apr 2018 16:59:33 +0200 (CEST) Received: (qmail 93133 invoked by uid 500); 18 Apr 2018 14:59:32 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 92896 invoked by uid 99); 18 Apr 2018 14:59:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2018 14:59:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 51CB0F68E9; Wed, 18 Apr 2018 14:59:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: otto@apache.org To: commits@metron.apache.org Date: Wed, 18 Apr 2018 14:59:49 -0000 Message-Id: In-Reply-To: <01a9dec83c064e4abd2c287d4c4baab7@git.apache.org> References: <01a9dec83c064e4abd2c287d4c4baab7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/52] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965 http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java new file mode 100644 index 0000000..b8949c5 --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.bolt; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@code FixedFrequencyFlushSignal} class. + */ +public class FixedFrequencyFlushSignalTest { + + @Test + public void testSignalFlush() { + + FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000); + + // not time to flush yet + assertFalse(signal.isTimeToFlush()); + + // advance time + signal.update(5000); + + // not time to flush yet + assertFalse(signal.isTimeToFlush()); + + // advance time + signal.update(7000); + + // time to flush + assertTrue(signal.isTimeToFlush()); + } + + @Test + public void testOutOfOrderTimestamps() { + FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000); + + // advance time, out-of-order + signal.update(5000); + signal.update(1000); + signal.update(7000); + signal.update(3000); + + // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even out-of-order), then it should signal a flush + assertTrue(signal.isTimeToFlush()); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeFrequency() { + new FixedFrequencyFlushSignal(-1000); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java deleted file mode 100644 index c3f2584..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import com.google.common.collect.ImmutableMap; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.statistics.OnlineStatisticsProvider; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Values; -import org.json.simple.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.*; -import static org.junit.Assert.*; - -/** - * Tests the KafkaDestinationHandler. - */ -public class KafkaDestinationHandlerTest { - - /** - * { - * "profile": "profile-one-destination", - * "foreach": "ip_src_addr", - * "init": { "x": "0" }, - * "update": { "x": "x + 1" }, - * "result": "x" - * } - */ - @Multiline - private String profileDefinition; - - private KafkaDestinationHandler handler; - private ProfileConfig profile; - private OutputCollector collector; - - @Before - public void setup() throws Exception { - handler = new KafkaDestinationHandler(); - profile = createDefinition(profileDefinition); - collector = Mockito.mock(OutputCollector.class); - } - - /** - * The handler must serialize the ProfileMeasurement into a JSONObject. - */ - @Test - public void testSerialization() throws Exception { - - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", "triage-value")) - .withDefinition(profile); - handler.emit(measurement, collector); - - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - - // expect a JSONObject - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - - // validate the json - JSONObject actual = (JSONObject) values.get(0); - assertEquals(measurement.getDefinition().getProfile(), actual.get("profile")); - assertEquals(measurement.getEntity(), actual.get("entity")); - assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); - assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); - assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); - assertNotNull(actual.get("timestamp")); - assertEquals("profiler", actual.get("source.type")); - } - - /** - * Values destined for Kafka can only be serialized into text, which limits the types of values - * that can result from a triage expression. Only primitive types and Strings are allowed. - */ - @Test - public void testInvalidType() throws Exception { - - // create one invalid expression and one valid expression - Map triageValues = ImmutableMap.of( - "invalid", new OnlineStatisticsProvider(), - "valid", 4); - - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(triageValues) - .withDefinition(profile); - handler.emit(measurement, collector); - - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - - // only the triage expression value itself should have been skipped, all others should be there - JSONObject actual = (JSONObject) values.get(0); - assertEquals(measurement.getDefinition().getProfile(), actual.get("profile")); - assertEquals(measurement.getEntity(), actual.get("entity")); - assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); - assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); - assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); - assertNotNull(actual.get("timestamp")); - assertEquals("profiler", actual.get("source.type")); - - // the invalid expression should be skipped due to invalid type - assertFalse(actual.containsKey("invalid")); - - // but the valid expression should still be there - assertEquals(triageValues.get("valid"), actual.get("valid")); - } - - /** - * Values destined for Kafka can only be serialized into text, which limits the types of values - * that can result from a triage expression. Only primitive types and Strings are allowed. - */ - @Test - public void testIntegerIsValidType() throws Exception { - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", 123)) - .withDefinition(profile); - handler.emit(measurement, collector); - - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - JSONObject actual = (JSONObject) values.get(0); - - // the triage expression is valid - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); - } - - /** - * Values destined for Kafka can only be serialized into text, which limits the types of values - * that can result from a triage expression. Only primitive types and Strings are allowed. - */ - @Test - public void testStringIsValidType() throws Exception { - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", "value")) - .withDefinition(profile); - handler.emit(measurement, collector); - - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - JSONObject actual = (JSONObject) values.get(0); - - // the triage expression is valid - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); - } - - /** - * Creates a profile definition based on a string of JSON. - * @param json The string of JSON. - */ - private ProfileConfig createDefinition(String json) throws IOException { - return JSONUtils.INSTANCE.load(json, ProfileConfig.class); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java new file mode 100644 index 0000000..b02e377 --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java @@ -0,0 +1,208 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.bolt; + +import com.google.common.collect.ImmutableMap; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.statistics.OnlineStatisticsProvider; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests the KafkaDestinationHandler. + */ +public class KafkaEmitterTest { + + /** + * { + * "profile": "profile-one-destination", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + */ + @Multiline + private String profileDefinition; + + private KafkaEmitter handler; + private ProfileConfig profile; + private OutputCollector collector; + + @Before + public void setup() throws Exception { + handler = new KafkaEmitter(); + profile = createDefinition(profileDefinition); + collector = Mockito.mock(OutputCollector.class); + } + + /** + * The handler must serialize the ProfileMeasurement into a JSONObject. + */ + @Test + public void testSerialization() throws Exception { + + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", "triage-value")) + .withDefinition(profile); + handler.emit(measurement, collector); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); + + // expect a JSONObject + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof JSONObject); + + // validate the json + JSONObject actual = (JSONObject) values.get(0); + assertEquals(measurement.getDefinition().getProfile(), actual.get("profile")); + assertEquals(measurement.getEntity(), actual.get("entity")); + assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); + assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); + assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + assertNotNull(actual.get("timestamp")); + assertEquals("profiler", actual.get("source.type")); + } + + /** + * Values destined for Kafka can only be serialized into text, which limits the types of values + * that can result from a triage expression. Only primitive types and Strings are allowed. + */ + @Test + public void testInvalidType() throws Exception { + + // create one invalid expression and one valid expression + Map triageValues = ImmutableMap.of( + "invalid", new OnlineStatisticsProvider(), + "valid", 4); + + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(triageValues) + .withDefinition(profile); + handler.emit(measurement, collector); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof JSONObject); + + // only the triage expression value itself should have been skipped, all others should be there + JSONObject actual = (JSONObject) values.get(0); + assertEquals(measurement.getDefinition().getProfile(), actual.get("profile")); + assertEquals(measurement.getEntity(), actual.get("entity")); + assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); + assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); + assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); + assertNotNull(actual.get("timestamp")); + assertEquals("profiler", actual.get("source.type")); + + // the invalid expression should be skipped due to invalid type + assertFalse(actual.containsKey("invalid")); + + // but the valid expression should still be there + assertEquals(triageValues.get("valid"), actual.get("valid")); + } + + /** + * Values destined for Kafka can only be serialized into text, which limits the types of values + * that can result from a triage expression. Only primitive types and Strings are allowed. + */ + @Test + public void testIntegerIsValidType() throws Exception { + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", 123)) + .withDefinition(profile); + handler.emit(measurement, collector); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof JSONObject); + JSONObject actual = (JSONObject) values.get(0); + + // the triage expression is valid + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + } + + /** + * Values destined for Kafka can only be serialized into text, which limits the types of values + * that can result from a triage expression. Only primitive types and Strings are allowed. + */ + @Test + public void testStringIsValidType() throws Exception { + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", "value")) + .withDefinition(profile); + handler.emit(measurement, collector); + + ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof JSONObject); + JSONObject actual = (JSONObject) values.get(0); + + // the triage expression is valid + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + } + + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfileConfig createDefinition(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 21d61ab..78e20e0 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -20,35 +20,37 @@ package org.apache.metron.profiler.bolt; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.profiler.MessageDistributor; import org.apache.metron.profiler.MessageRoute; -import org.apache.metron.profiler.ProfileBuilder; import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.profiler.integration.MessageBuilder; import org.apache.metron.test.bolt.BaseBoltTest; -import org.apache.storm.Constants; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -import static org.apache.metron.stellar.common.utils.ConversionUtils.convert; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -59,284 +61,348 @@ import static org.mockito.Mockito.when; */ public class ProfileBuilderBoltTest extends BaseBoltTest { - /** - * { - * "ip_src_addr": "10.0.0.1", - * "value": "22" - * } - */ - @Multiline - private String inputOne; - private JSONObject messageOne; + private JSONObject message1; + private JSONObject message2; + private ProfileConfig profile1; + private ProfileConfig profile2; + private ProfileMeasurementEmitter emitter; + private ManualFlushSignal flushSignal; - /** - * { - * "ip_src_addr": "10.0.0.2", - * "value": "22" - * } - */ - @Multiline - private String inputTwo; - private JSONObject messageTwo; + @Before + public void setup() throws Exception { + + message1 = new MessageBuilder() + .withField("ip_src_addr", "10.0.0.1") + .withField("value", "22") + .build(); + + message2 = new MessageBuilder() + .withField("ip_src_addr", "10.0.0.2") + .withField("value", "22") + .build(); + + profile1 = new ProfileConfig() + .withProfile("profile1") + .withForeach("ip_src_addr") + .withInit("x", "0") + .withUpdate("x", "x + 1") + .withResult("x"); + + profile2 = new ProfileConfig() + .withProfile("profile2") + .withForeach("ip_src_addr") + .withInit(Collections.singletonMap("x", "0")) + .withUpdate(Collections.singletonMap("x", "x + 1")) + .withResult("x"); + + flushSignal = new ManualFlushSignal(); + flushSignal.setFlushNow(false); + } /** - * { - * "profile": "profileOne", - * "foreach": "ip_src_addr", - * "init": { "x": "0" }, - * "update": { "x": "x + 1" }, - * "result": "x" - * } + * The bolt should extract a message and timestamp from a tuple and + * pass that to a {@code MessageDistributor}. */ - @Multiline - private String profileOne; + @Test + public void testExtractMessage() throws Exception { + ProfileBuilderBolt bolt = createBolt(); - /** - * { - * "profile": "profileTwo", - * "foreach": "ip_src_addr", - * "init": { "x": "0" }, - * "update": { "x": "x + 1" }, - * "result": "x" - * } - */ - @Multiline - private String profileTwo; + // create a mock + MessageDistributor distributor = mock(MessageDistributor.class); + bolt.withMessageDistributor(distributor); - public static Tuple mockTickTuple() { - Tuple tuple = mock(Tuple.class); - when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID); - when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID); - return tuple; - } + // create a tuple + final long timestamp1 = 100000000L; + Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1); - @Before - public void setup() throws Exception { - JSONParser parser = new JSONParser(); - messageOne = (JSONObject) parser.parse(inputOne); - messageTwo = (JSONObject) parser.parse(inputTwo); + // execute the bolt + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // the message should have been extracted from the tuple and passed to the MessageDistributor + verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any()); } + /** - * Creates a profile definition based on a string of JSON. - * @param json The string of JSON. + * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor} + * and emit the {@code ProfileMeasurement} values. */ - private ProfileConfig createDefinition(String json) throws IOException { - return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + @Test + public void testEmitWhenFlush() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a profile measurement + ProfileMeasurement m = new ProfileMeasurement() + .withEntity("entity1") + .withProfileName("profile1") + .withPeriod(1000, 500, TimeUnit.MILLISECONDS) + .withProfileValue(22); + + // create a mock that returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flush()).thenReturn(Collections.singletonList(m)); + bolt.withMessageDistributor(distributor); + + // signal the bolt to flush + flushSignal.setFlushNow(true); + + // execute the bolt + Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // a profile measurement should be emitted by the bolt + List measurements = getProfileMeasurements(outputCollector, 1); + assertEquals(1, measurements.size()); + assertEquals(m, measurements.get(0)); } /** - * Create a tuple that will contain the message, the entity name, and profile definition. - * @param entity The entity name - * @param message The telemetry message. - * @param profile The profile definition. + * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted. */ - private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile) { - Tuple tuple = mock(Tuple.class); - when(tuple.getValueByField(eq("message"))).thenReturn(message); - when(tuple.getValueByField(eq("entity"))).thenReturn(entity); - when(tuple.getValueByField(eq("profile"))).thenReturn(profile); - return tuple; + @Test + public void testDoNotEmitWhenNoFlush() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a profile measurement + ProfileMeasurement m = new ProfileMeasurement() + .withEntity("entity1") + .withProfileName("profile1") + .withPeriod(1000, 500, TimeUnit.MILLISECONDS) + .withProfileValue(22); + + // create a mock that returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flush()).thenReturn(Collections.singletonList(m)); + bolt.withMessageDistributor(distributor); + + // no flush signal + flushSignal.setFlushNow(false); + + // execute the bolt + Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // nothing should have been emitted + getProfileMeasurements(outputCollector, 0); } /** - * Create a ProfileBuilderBolt to test + * A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each + * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations. */ - private ProfileBuilderBolt createBolt() throws IOException { + @Test + public void testEmitters() throws Exception { + + // defines the zk configurations accessible from the bolt + ProfilerConfigurations configurations = new ProfilerConfigurations(); + configurations.updateGlobalConfig(Collections.emptyMap()); + + // create the bolt with 3 destinations + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withProfileTimeToLive(30, TimeUnit.MINUTES) + .withPeriodDuration(10, TimeUnit.MINUTES) + .withMaxNumberOfRoutes(Long.MAX_VALUE) + .withZookeeperClient(client) + .withZookeeperCache(cache) + .withEmitter(new TestEmitter("destination1")) + .withEmitter(new TestEmitter("destination2")) + .withEmitter(new TestEmitter("destination3")) + .withProfilerConfigurations(configurations) + .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES)); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); - ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL"); - bolt.setCuratorFramework(client); - bolt.setZKCache(cache); - bolt.withPeriodDuration(10, TimeUnit.MINUTES); - bolt.withProfileTimeToLive(30, TimeUnit.MINUTES); + // signal the bolt to flush + bolt.withFlushSignal(flushSignal); + flushSignal.setFlushNow(true); - // define the valid destinations for the profiler - bolt.withDestinationHandler(new HBaseDestinationHandler()); - bolt.withDestinationHandler(new KafkaDestinationHandler()); + // execute the bolt + Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis()); + TupleWindow window = createWindow(tuple1); + bolt.execute(window); - bolt.prepare(new HashMap<>(), topologyContext, outputCollector); - return bolt; + // validate measurements emitted to each + verify(outputCollector, times(1)).emit(eq("destination1"), any()); + verify(outputCollector, times(1)).emit(eq("destination2"), any()); + verify(outputCollector, times(1)).emit(eq("destination3"), any()); } - /** - * The bolt should create a ProfileBuilder to manage a profile. - */ @Test - public void testCreateProfileBuilder() throws Exception { + public void testFlushExpiredWithTick() throws Exception { ProfileBuilderBolt bolt = createBolt(); - ProfileConfig definition = createDefinition(profileOne); - String entity = (String) messageOne.get("ip_src_addr"); - Tuple tupleOne = createTuple(entity, messageOne, definition); - // execute - send two tuples with different entities - bolt.execute(tupleOne); + // create a mock + MessageDistributor distributor = mock(MessageDistributor.class); + bolt.withMessageDistributor(distributor); + + // tell the bolt to flush on the first window + flushSignal.setFlushNow(true); - // validate - 1 messages applied - MessageRoute route = new MessageRoute(definition, entity); - ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(route, Context.EMPTY_CONTEXT()); - assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class)); + // execute the bolt; include a tick tuple in the window + Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L); + TupleWindow tupleWindow = createWindow(tuple1, mockTickTuple()); + bolt.execute(tupleWindow); + + // ensure the expired profiles were flushed when the tick tuple was received + verify(distributor).flushExpired(); } - /** - * This test creates two different messages, with different entities that are applied to - * the same profile. The bolt should create separate ProfileBuilder objects to handle each - * profile/entity pair. - */ @Test - public void testCreateProfileBuilderForEachEntity() throws Exception { + public void testFlushExpiredWithNoTick() throws Exception { - // setup ProfileBuilderBolt bolt = createBolt(); - ProfileConfig definition = createDefinition(profileOne); - - // apply a message to the profile - String entityOne = (String) messageOne.get("ip_src_addr"); - Tuple tupleOne = createTuple(entityOne, messageOne, definition); - bolt.execute(tupleOne); - bolt.execute(tupleOne); - - // apply a different message (with different entity) to the same profile - String entityTwo = (String) messageTwo.get("ip_src_addr"); - Tuple tupleTwo = createTuple(entityTwo, messageTwo, definition); - bolt.execute(tupleTwo); - - // validate - 2 messages applied - MessageRoute routeOne = new MessageRoute(definition, entityOne); - ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(routeOne, Context.EMPTY_CONTEXT()); - assertTrue(builderOne.isInitialized()); - assertEquals(2, (int) convert(builderOne.valueOf("x"), Integer.class)); - - // validate - 1 message applied - MessageRoute routeTwo = new MessageRoute(definition, entityTwo); - ProfileBuilder builderTwo = bolt.getMessageDistributor().getBuilder(routeTwo, Context.EMPTY_CONTEXT()); - assertTrue(builderTwo.isInitialized()); - assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class)); - - assertNotSame(builderOne, builderTwo); + + // create a mock + MessageDistributor distributor = mock(MessageDistributor.class); + bolt.withMessageDistributor(distributor); + + // tell the bolt to flush on the first window + flushSignal.setFlushNow(true); + + // execute the bolt; NO tick tuple + Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // there was no tick tuple; the expired profiles should NOT have been flushed + verify(distributor, times(0)).flushExpired(); } /** - * The bolt should create separate ProfileBuilder objects to handle each - * profile/entity pair. + * Creates a mock tick tuple to use for testing. + * @return A mock tick tuple. */ - @Test - public void testCreateProfileBuilderForEachProfile() throws Exception { + private Tuple mockTickTuple() { - // setup - apply one message to different profile definitions - ProfileBuilderBolt bolt = createBolt(); - String entity = (String) messageOne.get("ip_src_addr"); - - // apply a message to the first profile - ProfileConfig definitionOne = createDefinition(profileOne); - Tuple tupleOne = createTuple(entity, messageOne, definitionOne); - bolt.execute(tupleOne); - - // apply the same message to the second profile - ProfileConfig definitionTwo = createDefinition(profileTwo); - Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo); - bolt.execute(tupleTwo); - - // validate - 1 message applied - MessageRoute routeOne = new MessageRoute(definitionOne, entity); - ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(routeOne, Context.EMPTY_CONTEXT()); - assertTrue(builderOne.isInitialized()); - assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class)); - - // validate - 1 message applied - MessageRoute routeTwo = new MessageRoute(definitionTwo, entity); - ProfileBuilder builderTwo = bolt.getMessageDistributor().getBuilder(routeTwo, Context.EMPTY_CONTEXT()); - assertTrue(builderTwo.isInitialized()); - assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class)); - - assertNotSame(builderOne, builderTwo); + Tuple tuple = mock(Tuple.class); + when(tuple.getSourceComponent()).thenReturn("__system"); + when(tuple.getSourceStreamId()).thenReturn("__tick"); + + return tuple; } /** - * A ProfileMeasurement is build for each profile/entity pair. A measurement for each profile/entity - * pair should be emitted. + * Retrieves the ProfileMeasurement(s) (if any) that have been emitted. + * + * @param collector The Storm output collector. + * @param expected The number of measurements expected. + * @return A list of ProfileMeasurement(s). */ - @Test - public void testEmitMeasurements() throws Exception { - - // setup - ProfileBuilderBolt bolt = createBolt(); - final String entity = (String) messageOne.get("ip_src_addr"); + private List getProfileMeasurements(OutputCollector collector, int expected) { - // apply the message to the first profile - ProfileConfig definitionOne = createDefinition(profileOne); - Tuple tupleOne = createTuple(entity, messageOne, definitionOne); - bolt.execute(tupleOne); + // the 'streamId' is defined by the DestinationHandler being used by the bolt + final String streamId = emitter.getStreamId(); - // apply the same message to the second profile - ProfileConfig definitionTwo = createDefinition(profileTwo); - Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo); - bolt.execute(tupleTwo); + // capture the emitted tuple(s) + ArgumentCaptor argCaptor = ArgumentCaptor.forClass(Values.class); + verify(collector, times(expected)) + .emit(eq(streamId), argCaptor.capture()); - // execute - the tick tuple triggers a flush of the profile - bolt.execute(mockTickTuple()); + // return the profile measurements that were emitted + return argCaptor.getAllValues() + .stream() + .map(val -> (ProfileMeasurement) val.get(0)) + .collect(Collectors.toList()); + } - // capture the ProfileMeasurement that should be emitted - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + /** + * Create a tuple that will contain the message, the entity name, and profile definition. + * @param entity The entity name + * @param message The telemetry message. + * @param profile The profile definition. + */ + private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) { - // validate emitted measurements for hbase - verify(outputCollector, atLeastOnce()).emit(eq("hbase"), arg.capture()); - for (Values value : arg.getAllValues()) { + Tuple tuple = mock(Tuple.class); + when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message); + when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp); + when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity); + when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile); - ProfileMeasurement measurement = (ProfileMeasurement) value.get(0); - ProfileConfig definition = measurement.getDefinition(); + return tuple; + } - if (StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) { + /** + * Create a ProfileBuilderBolt to test. + * @return A {@link ProfileBuilderBolt} to test. + */ + private ProfileBuilderBolt createBolt() throws IOException { - // validate measurement emitted for profile two - assertEquals(definitionTwo, definition); - assertEquals(entity, measurement.getEntity()); - assertEquals(definitionTwo.getProfile(), measurement.getProfileName()); - assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class)); + return createBolt(30, TimeUnit.SECONDS); + } - } else if (StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) { + /** + * Create a ProfileBuilderBolt to test. + * + * @param windowDuration The event window duration. + * @param windowDurationUnits The units of the event window duration. + * @return A {@link ProfileBuilderBolt} to test. + */ + private ProfileBuilderBolt createBolt(int windowDuration, TimeUnit windowDurationUnits) throws IOException { + + // defines the zk configurations accessible from the bolt + ProfilerConfigurations configurations = new ProfilerConfigurations(); + configurations.updateGlobalConfig(Collections.emptyMap()); + + emitter = new HBaseEmitter(); + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withProfileTimeToLive(30, TimeUnit.MINUTES) + .withMaxNumberOfRoutes(Long.MAX_VALUE) + .withZookeeperClient(client) + .withZookeeperCache(cache) + .withEmitter(emitter) + .withProfilerConfigurations(configurations) + .withPeriodDuration(1, TimeUnit.MINUTES) + .withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, windowDurationUnits)); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); - // validate measurement emitted for profile one - assertEquals(definitionOne, definition); - assertEquals(entity, measurement.getEntity()); - assertEquals(definitionOne.getProfile(), measurement.getProfileName()); - assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class)); + // set the flush signal AFTER calling 'prepare' + bolt.withFlushSignal(flushSignal); - } else { - fail(); - } - } + return bolt; } /** - * A ProfileMeasurement is build for each profile/entity pair. The measurement should be emitted to each - * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations. + * Creates a mock TupleWindow containing multiple tuples. + * @param tuples The tuples to add to the window. */ - @Test - public void testDestinationHandlers() throws Exception { + private TupleWindow createWindow(Tuple... tuples) { - // setup - ProfileBuilderBolt bolt = createBolt(); - ProfileConfig definitionOne = createDefinition(profileOne); + TupleWindow window = mock(TupleWindow.class); + when(window.get()).thenReturn(Arrays.asList(tuples)); + return window; + } - // apply the message to the first profile - final String entity = (String) messageOne.get("ip_src_addr"); - Tuple tupleOne = createTuple(entity, messageOne, definitionOne); - bolt.execute(tupleOne); + /** + * An implementation for testing purposes only. + */ + private class TestEmitter implements ProfileMeasurementEmitter { - // trigger a flush of the profile - bolt.execute(mockTickTuple()); + private String streamId; - // capture the values that should be emitted - ArgumentCaptor arg = ArgumentCaptor.forClass(Values.class); + public TestEmitter(String streamId) { + this.streamId = streamId; + } - // validate measurements emitted to HBase - verify(outputCollector, times(1)).emit(eq("hbase"), arg.capture()); - assertTrue(arg.getValue().get(0) instanceof ProfileMeasurement); + @Override + public String getStreamId() { + return streamId; + } - // validate measurements emitted to Kafka - verify(outputCollector, times(1)).emit(eq("kafka"), arg.capture()); - assertTrue(arg.getValue().get(0) instanceof JSONObject); + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(getStreamId(), new Fields("measurement")); + } + + @Override + public void emit(ProfileMeasurement measurement, OutputCollector collector) { + collector.emit(getStreamId(), new Values(measurement)); + } } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java index 17d6827..04c774c 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java @@ -20,11 +20,11 @@ package org.apache.metron.profiler.bolt; -import org.apache.metron.common.configuration.profiler.ProfileResult; -import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfileResult; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.metron.profiler.hbase.RowKeyBuilder; +import org.apache.storm.tuple.Tuple; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -32,10 +32,8 @@ import org.junit.Test; import java.util.Optional; import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java index beab8d5..bf81923 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java @@ -21,7 +21,10 @@ package org.apache.metron.profiler.bolt; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.clock.FixedClockFactory; +import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.test.bolt.BaseBoltTest; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; @@ -31,12 +34,15 @@ import org.json.simple.parser.ParseException; import org.junit.Before; import org.junit.Test; -import java.io.IOException; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.HashMap; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.refEq; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests the ProfileSplitterBolt. @@ -47,7 +53,9 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { * { * "ip_src_addr": "10.0.0.1", * "ip_dst_addr": "10.0.0.20", - * "protocol": "HTTP" + * "protocol": "HTTP", + * "timestamp.custom": 2222222222222, + * "timestamp.string": "3333333333333" * } */ @Multiline @@ -68,7 +76,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { * } */ @Multiline - private String onlyIfTrue; + private String profileWithOnlyIfTrue; /** * { @@ -85,7 +93,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { * } */ @Multiline - private String onlyIfFalse; + private String profileWithOnlyIfFalse; /** * { @@ -101,7 +109,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { * } */ @Multiline - private String onlyIfMissing; + private String profileWithOnlyIfMissing; /** * { @@ -118,9 +126,89 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { * } */ @Multiline - private String onlyIfInvalid; + private String profileWithOnlyIfInvalid; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ], + * "timestampField": "timestamp.custom" + * } + */ + @Multiline + private String profileUsingCustomTimestampField; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ], + * "timestampField": "timestamp.missing" + * } + */ + @Multiline + private String profileUsingMissingTimestampField; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ], + * "timestampField": "timestamp.string" + * } + */ + @Multiline + private String profileUsingStringTimestampField; + + /** + * { + * "profiles": [ + * ] + * } + */ + @Multiline + private String noProfilesDefined; + + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "'global'", + * "result": "1" + * }, + * { + * "profile": "profile2", + * "foreach": "'global'", + * "result": "2" + * } + * ] + * } + */ + @Multiline + private String twoProfilesDefined; private JSONObject message; + private long timestamp = 3333333; @Before public void setup() throws ParseException { @@ -134,17 +222,83 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { } /** - * Create a ProfileSplitterBolt to test + * Ensure that a tuple with the correct fields is emitted to downstream bolts + * when a profile is defined. */ - private ProfileSplitterBolt createBolt(String profilerConfig) throws IOException { + @Test + public void testEmitTupleWithOneProfile() throws Exception { - ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL"); - bolt.setCuratorFramework(client); - bolt.setZKCache(cache); - bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8")); - bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + // setup the bolt and execute a tuple + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); - return bolt; + // the expected tuple fields + String expectedEntity = "10.0.0.1"; + ProfileConfig expectedConfig = config.getProfiles().get(0); + Values expected = new Values(message, timestamp, expectedEntity, expectedConfig); + + // a tuple should be emitted for the downstream profile builder + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * If there are two profiles that need the same message, then two tuples should + * be emitted. One tuple for each profile. + */ + @Test + public void testEmitTupleWithTwoProfiles() throws Exception { + + // setup the bolt and execute a tuple + ProfilerConfig config = toProfilerConfig(twoProfilesDefined); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // the expected tuple fields + final String expectedEntity = "global"; + { + // a tuple should be emitted for the first profile + ProfileConfig profile1 = config.getProfiles().get(0); + Values expected = new Values(message, timestamp, expectedEntity, profile1); + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + } + { + // a tuple should be emitted for the second profile + ProfileConfig profile2 = config.getProfiles().get(1); + Values expected = new Values(message, timestamp, expectedEntity, profile2); + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + } + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * No tuples should be emitted, if no profiles are defined. + */ + @Test + public void testNoProfilesDefined() throws Exception { + + // setup the bolt and execute a tuple + ProfilerConfig config = toProfilerConfig(noProfilesDefined); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // no tuple should be emitted + verify(outputCollector, times(0)) + .emit(any(Tuple.class), any()); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); } /** @@ -154,17 +308,17 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { @Test public void testOnlyIfTrue() throws Exception { - // setup - ProfileSplitterBolt bolt = createBolt(onlyIfTrue); - - // execute + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); + ProfileSplitterBolt bolt = createBolt(config); bolt.execute(tuple); // a tuple should be emitted for the downstream profile builder - verify(outputCollector, times(1)).emit(refEq(tuple), any(Values.class)); + verify(outputCollector, times(1)) + .emit(eq(tuple), any(Values.class)); // the original tuple should be ack'd - verify(outputCollector, times(1)).ack(tuple); + verify(outputCollector, times(1)) + .ack(eq(tuple)); } /** @@ -174,17 +328,17 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { @Test public void testOnlyIfMissing() throws Exception { - // setup - ProfileSplitterBolt bolt = createBolt(onlyIfMissing); - - // execute + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing); + ProfileSplitterBolt bolt = createBolt(config); bolt.execute(tuple); // a tuple should be emitted for the downstream profile builder - verify(outputCollector, times(1)).emit(refEq(tuple), any(Values.class)); + verify(outputCollector, times(1)) + .emit(eq(tuple), any(Values.class)); // the original tuple should be ack'd - verify(outputCollector, times(1)).ack(tuple); + verify(outputCollector, times(1)) + .ack(eq(tuple)); } /** @@ -194,36 +348,45 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { @Test public void testOnlyIfFalse() throws Exception { - // setup - ProfileSplitterBolt bolt = createBolt(onlyIfFalse); - - // execute + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse); + ProfileSplitterBolt bolt = createBolt(config); bolt.execute(tuple); // a tuple should NOT be emitted for the downstream profile builder - verify(outputCollector, times(0)).emit(any(Values.class)); + verify(outputCollector, times(0)) + .emit(any()); // the original tuple should be ack'd - verify(outputCollector, times(1)).ack(tuple); + verify(outputCollector, times(1)) + .ack(eq(tuple)); } /** - * The entity associated with a ProfileMeasurement can be defined using a variable that is resolved - * via Stella. In this case the entity is defined as 'ip_src_addr' which is resolved to - * '10.0.0.1' based on the data contained within the message. + * The entity associated with a profile is defined with a Stellar expression. That expression + * can refer to any field within the message. + * + * In this case the entity is defined as 'ip_src_addr' which is resolved to '10.0.0.1' based on + * the data contained within the message. */ @Test public void testResolveEntityName() throws Exception { - // setup - ProfileSplitterBolt bolt = createBolt(onlyIfTrue); - - // execute + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); + ProfileSplitterBolt bolt = createBolt(config); bolt.execute(tuple); - // verify - the entity name comes from variable resolution in stella + // expected values String expectedEntity = "10.0.0.1"; - verify(outputCollector, times(1)).emit(any(Tuple.class), refEq(new Values(expectedEntity, onlyIfTrue, message))); + ProfileConfig expectedConfig = config.getProfiles().get(0); + Values expected = new Values(message, timestamp, expectedEntity, expectedConfig); + + // a tuple should be emitted for the downstream profile builder + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); } /** @@ -232,11 +395,42 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { @Test public void testOnlyIfInvalid() throws Exception { - // setup - ProfileSplitterBolt bolt = createBolt(onlyIfInvalid); + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid); + ProfileSplitterBolt bolt = createBolt(config); bolt.execute(tuple); // a tuple should NOT be emitted for the downstream profile builder - verify(outputCollector, times(0)).emit(any(Values.class)); + verify(outputCollector, times(0)) + .emit(any(Values.class)); + } + + /** + * Creates a ProfilerConfig based on a string containing JSON. + * + * @param configAsJSON The config as JSON. + * @return The ProfilerConfig. + * @throws Exception + */ + private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception { + InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8")); + return JSONUtils.INSTANCE.load(in, ProfilerConfig.class); } + + /** + * Create a ProfileSplitterBolt to test + */ + private ProfileSplitterBolt createBolt(ProfilerConfig config) throws Exception { + + ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL"); + bolt.setCuratorFramework(client); + bolt.setZKCache(cache); + bolt.getConfigurations().updateProfilerConfig(config); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + + // set the clock factory AFTER calling prepare to use the fixed clock factory + bolt.setClockFactory(new FixedClockFactory(timestamp)); + + return bolt; + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java new file mode 100644 index 0000000..7e1628b --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.integration; + +import org.json.simple.JSONObject; + +import java.util.HashMap; +import java.util.Map; + +/** + * Enables simple creation of telemetry messages for testing. + */ +public class MessageBuilder { + + private Map fields; + + /** + * Create a new {@link MessageBuilder}. + */ + public MessageBuilder() { + this.fields = new HashMap<>(); + } + + /** + * Adds all of the fields from a message to this message. + * + * @param prototype The other message that is treated as a prototype. + * @return A {@link MessageBuilder} + */ + public MessageBuilder withFields(JSONObject prototype) { + prototype.forEach((key, val) -> this.fields.put(key, val)); + return this; + } + + /** + * Adds a field to the message. + * + * @param key The field name. + * @param value The field value. + * @return A {@link MessageBuilder} + */ + public MessageBuilder withField(String key, Object value) { + this.fields.put(key, value); + return this; + } + + /** + * Build the message. + * + *

This should be called after defining all of the message fields. + * + * @return A {@link MessageBuilder}. + */ + public JSONObject build() { + return new JSONObject(fields); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index 0d1b465..c48a3e9 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -28,15 +28,18 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.SerDeUtils; -import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.FluxTopologyComponent; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.metron.profiler.ProfileMeasurement; import org.apache.metron.profiler.hbase.ColumnBuilder; +import org.apache.metron.profiler.hbase.RowKeyBuilder; +import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; import org.apache.metron.statistics.OnlineStatisticsProvider; import org.junit.After; @@ -49,15 +52,15 @@ import org.junit.Test; import java.io.File; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.concurrent.TimeUnit; import static com.google.code.tempusfugit.temporal.Duration.seconds; import static com.google.code.tempusfugit.temporal.Timeout.timeout; import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; /** * An integration test of the Profiler topology. @@ -105,7 +108,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static FluxTopologyComponent fluxComponent; private static KafkaComponent kafkaComponent; private static ConfigUploadComponent configUploadComponent; - private static List input; private static ComponentRunner runner; private static MockHTable profilerTable; @@ -114,7 +116,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static final double epsilon = 0.001; private static final String inputTopic = Constants.INDEXING_TOPIC; private static final String outputTopic = "profiles"; + private static final int saltDivisor = 10; + private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5); + private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5); + private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15); + private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20); + private static final long maxRoutesPerBolt = 100000; /** * Tests the first example contained within the README. @@ -122,22 +130,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Test public void testExample1() throws Exception { - update(TEST_RESOURCES + "/config/zookeeper/readme-example-1"); + uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, input); + kafkaComponent.writeMessages(inputTopic, message1, message1, message1); + kafkaComponent.writeMessages(inputTopic, message2, message2, message2); + kafkaComponent.writeMessages(inputTopic, message3, message3, message3); // verify - ensure the profile is being persisted waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, - timeout(seconds(90))); + timeout(seconds(180))); // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value - List actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); + List actuals = read(profilerTable.getPutLog(), columnFamily, + columnBuilder.getColumnQualifier("value"), Double.class); - // verify - there are 5 'HTTP' each with 390 bytes + // verify - there are 3 'HTTP' each with 390 bytes Assert.assertTrue(actuals.stream().anyMatch(val -> - MathUtils.equals(390.0 * 5, val, epsilon) + MathUtils.equals(390.0 * 3, val, epsilon) )); } @@ -147,11 +158,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Test public void testExample2() throws Exception { - update(TEST_RESOURCES + "/config/zookeeper/readme-example-2"); + uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, input); + kafkaComponent.writeMessages(inputTopic, message1, message1, message1); + kafkaComponent.writeMessages(inputTopic, message2, message2, message2); + kafkaComponent.writeMessages(inputTopic, message3, message3, message3); // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3 final int expected = 2; @@ -161,16 +174,17 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { timeout(seconds(90))); // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS' - List actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); + List actuals = read(profilerTable.getPutLog(), columnFamily, + columnBuilder.getColumnQualifier("value"), Double.class); - // verify - 10.0.0.3 -> 1/6 - Assert.assertTrue( "Could not find a value near 1/6. Actual values read are are: " + Joiner.on(",").join(actuals) - , actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/6.0, epsilon) + // verify - 10.0.0.3 -> 1/4 + Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals), + actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon) )); - // verify - 10.0.0.2 -> 6/1 - Assert.assertTrue("Could not find a value near 6. Actual values read are are: " + Joiner.on(",").join(actuals) - ,actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, epsilon) + // verify - 10.0.0.2 -> 4/1 + Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals), + actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon) )); } @@ -180,22 +194,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Test public void testExample3() throws Exception { - update(TEST_RESOURCES + "/config/zookeeper/readme-example-3"); + uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, input); + kafkaComponent.writeMessages(inputTopic, message1, message1, message1); + kafkaComponent.writeMessages(inputTopic, message2, message2, message2); + kafkaComponent.writeMessages(inputTopic, message3, message3, message3); // verify - ensure the profile is being persisted waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value - List actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); + List actuals = read(profilerTable.getPutLog(), columnFamily, + columnBuilder.getColumnQualifier("value"), Double.class); // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20 - Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals) - , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon) + Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals), + actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon) )); } @@ -205,11 +222,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Test public void testExample4() throws Exception { - update(TEST_RESOURCES + "/config/zookeeper/readme-example-4"); + uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-4"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, input); + kafkaComponent.writeMessages(inputTopic, message1, message1, message1); + kafkaComponent.writeMessages(inputTopic, message2, message2, message2); + kafkaComponent.writeMessages(inputTopic, message3, message3, message3); // verify - ensure the profile is being persisted waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, @@ -220,34 +239,109 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class); // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20 - Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals) - , actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon) + Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals), + actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon) )); } @Test public void testPercentiles() throws Exception { - update(TEST_RESOURCES + "/config/zookeeper/percentiles"); + uploadConfig(TEST_RESOURCES + "/config/zookeeper/percentiles"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + kafkaComponent.writeMessages(inputTopic, message1, message1, message1); + kafkaComponent.writeMessages(inputTopic, message2, message2, message2); + kafkaComponent.writeMessages(inputTopic, message3, message3, message3); + + // verify - ensure the profile is being persisted + waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, + timeout(seconds(90))); + + List actuals = read(profilerTable.getPutLog(), columnFamily, + columnBuilder.getColumnQualifier("value"), Double.class); + // verify - the 70th percentile of x3, 20s = 20.0 + Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals), + actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon))); + } + + /** + * The Profiler can optionally perform event time processing. With event time processing, + * the Profiler uses timestamps contained in the source telemetry. + * + *

Defining a 'timestampField' within the Profiler configuration tells the Profiler + * from which field the timestamp should be extracted. + */ + @Test + public void testEventTimeProcessing() throws Exception { + + // constants used for the test + final long startAt = 10; + final String entity = "10.0.0.1"; + final String profileName = "event-time-test"; + + // create some messages that contain a timestamp - a really old timestamp; close to 1970 + String message1 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt) + .build() + .toJSONString(); + + String message2 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt + 100) + .build() + .toJSONString(); + + uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, input); + kafkaComponent.writeMessages(inputTopic, message1, message2); // verify - ensure the profile is being persisted waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - List actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class); + List puts = profilerTable.getPutLog(); + assertEquals(1, puts.size()); + + // inspect the row key to ensure the profiler used event time correctly. the timestamp + // embedded in the row key should match those in the source telemetry + byte[] expectedRowKey = generateExpectedRowKey(profileName, entity, startAt); + byte[] actualRowKey = puts.get(0).getRow(); + String msg = String.format("expected '%s', got '%s'", + new String(expectedRowKey, "UTF-8"), + new String(actualRowKey, "UTF-8")); + assertArrayEquals(msg, expectedRowKey, actualRowKey); + } - // verify - the 70th percentile of 5 x 20s = 20.0 - Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals) - , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon))); + /** + * Generates the expected row key. + * + * @param profileName The name of the profile. + * @param entity The entity. + * @param whenMillis A timestamp in epoch milliseconds. + * @return A row key. + */ + private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) { + + // only the profile name, entity, and period are used to generate the row key + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName(profileName) + .withEntity(entity) + .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS); + + // build the row key + RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); + return rowKeyBuilder.rowKey(measurement); } /** * Reads a value written by the Profiler. + * * @param family The column family. * @param qualifier The column qualifier. * @param clazz The expected type of the value. @@ -258,7 +352,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List results = new ArrayList<>(); for(Put put: puts) { - for(Cell cell: put.get(Bytes.toBytes(family), qualifier)) { + List cells = put.get(Bytes.toBytes(family), qualifier); + for(Cell cell : cells) { T value = SerDeUtils.fromBytes(cell.getValue(), clazz); results.add(value); } @@ -271,39 +366,41 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { public static void setupBeforeClass() throws UnableToStartException { columnBuilder = new ValueOnlyColumnBuilder(columnFamily); - List inputNew = Stream.of(message1, message2, message3) - .map(m -> Collections.nCopies(5, m)) - .flatMap(l -> l.stream()) - .collect(Collectors.toList()); - - // create input messages for the profiler to consume - input = Stream.of(message1, message2, message3) - .map(Bytes::toBytes) - .map(m -> Collections.nCopies(5, m)) - .flatMap(l -> l.stream()) - .collect(Collectors.toList()); - // storm topology properties final Properties topologyProperties = new Properties() {{ - setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); + + // storm settings setProperty("profiler.workers", "1"); setProperty("profiler.executors", "0"); + setProperty("storm.auto.credentials", "[]"); + setProperty("topology.auto-credentials", "[]"); + setProperty("topology.message.timeout.secs", "60"); + setProperty("topology.max.spout.pending", "100000"); + + // kafka settings setProperty("profiler.input.topic", inputTopic); setProperty("profiler.output.topic", outputTopic); - setProperty("profiler.period.duration", "20"); - setProperty("profiler.period.duration.units", "SECONDS"); - setProperty("profiler.ttl", "30"); - setProperty("profiler.ttl.units", "MINUTES"); - setProperty("profiler.hbase.salt.divisor", "10"); + setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); + setProperty("kafka.security.protocol", "PLAINTEXT"); + + // hbase settings + setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor)); setProperty("profiler.hbase.table", tableName); setProperty("profiler.hbase.column.family", columnFamily); setProperty("profiler.hbase.batch", "10"); setProperty("profiler.hbase.flush.interval.seconds", "1"); - setProperty("profiler.profile.ttl", "20"); setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName()); - setProperty("storm.auto.credentials", "[]"); - setProperty("kafka.security.protocol", "PLAINTEXT"); - setProperty("topology.auto-credentials", "[]"); + + // profile settings + setProperty("profiler.period.duration", Long.toString(periodDurationMillis)); + setProperty("profiler.period.duration.units", "MILLISECONDS"); + setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis)); + setProperty("profiler.ttl.units", "MILLISECONDS"); + setProperty("profiler.window.duration", Long.toString(windowDurationMillis)); + setProperty("profiler.window.duration.units", "MILLISECONDS"); + setProperty("profiler.window.lag", Long.toString(windowLagMillis)); + setProperty("profiler.window.lag.units", "MILLISECONDS"); + setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt)); }}; // create the mock table @@ -311,7 +408,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { zkComponent = getZKServerComponent(topologyProperties); - // create the input topic + // create the input and output topics kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList( new KafkaComponent.Topic(inputTopic, 1), new KafkaComponent.Topic(outputTopic, 1))); @@ -340,12 +437,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { runner.start(); } - public void update(String path) throws Exception { - configUploadComponent.withGlobalConfiguration(path) - .withProfilerConfiguration(path); - configUploadComponent.update(); - } - @AfterClass public static void tearDownAfterClass() throws Exception { MockHBaseTableProvider.clear(); @@ -368,4 +459,16 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { runner.reset(); } } -} \ No newline at end of file + + /** + * Uploads config values to Zookeeper. + * @param path The path on the local filesystem to the config values. + * @throws Exception + */ + public void uploadConfig(String path) throws Exception { + configUploadComponent + .withGlobalConfiguration(path) + .withProfilerConfiguration(path) + .update(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml index c7f6ce2..8546b56 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml @@ -57,9 +57,33 @@ value-list - DAYS + HOURS + MINUTES + + + SECONDS + + + 1 + + + + profiler_window_duration + 30 + The duration of each profile window. This value should be defined along with profiler.window.duration.units + Window Duration + + + profiler_window_units + SECONDS + The units used to specify the profiler.window.duration. This value should be defined along with profiler.window.duration. + Window Units + + value-list + + HOURS @@ -71,7 +95,6 @@ 1 - profiler_ttl @@ -104,8 +127,54 @@ 1 - - + + + profiler_window_lag + 1 + The maximum time lag for timestamps. Timestamps cannot arrive out-of-order by more than this amount. + Window Time Lag + + + profiler_window_lag_units + MINUTES + The units used to specify the Event Time Lag. + Window Lag Units + + value-list + + + HOURS + + + MINUTES + + + SECONDS + + + 1 + + + + profiler_topology_message_timeout_secs + The maximum amount of time a message has to complete before it is considered failed. + Profiler Topology Message Timeout + 900 + + + profiler_topology_max_spout_pending + Profiler Topology Spout Max Pending Tuples + Spout Max Pending Tuples + + + true + + + + profiler_max_routes_per_bolt + 100000 + The max number of routes allowed per bolt. The number of routes increases as the number of profiles and entities increases. + Max Routes Per Bolt profiler_hbase_table