From commits-return-4156-archive-asf-public=cust-asf.ponee.io@metron.apache.org Fri Dec 7 20:25:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D5EE1180647 for ; Fri, 7 Dec 2018 20:25:18 +0100 (CET) Received: (qmail 77866 invoked by uid 500); 7 Dec 2018 19:25:18 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 77857 invoked by uid 99); 7 Dec 2018 19:25:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Dec 2018 19:25:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE8CAE11F7; Fri, 7 Dec 2018 19:25:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nickallen@apache.org To: commits@metron.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: metron git commit: METRON-1810 Storm Profiler Intermittent Test Failure (nickwallen) closes apache/metron#1289 Date: Fri, 7 Dec 2018 19:25:17 +0000 (UTC) Repository: metron Updated Branches: refs/heads/master e81a5c102 -> d749961da METRON-1810 Storm Profiler Intermittent Test Failure (nickwallen) closes apache/metron#1289 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d749961d Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d749961d Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d749961d Branch: refs/heads/master Commit: d749961daaa77624e1cb6878548479d86c8fe668 Parents: e81a5c1 Author: nickwallen Authored: Fri Dec 7 14:24:48 2018 -0500 Committer: nickallen Committed: Fri Dec 7 14:24:48 2018 -0500 ---------------------------------------------------------------------- .../profiler/DefaultMessageDistributor.java | 75 +++++++------- .../profiler/DefaultMessageDistributorTest.java | 12 +-- metron-analytics/metron-profiler-storm/pom.xml | 42 ++++++-- .../integration/ProfilerIntegrationTest.java | 103 ++++++++----------- .../metron/integration/utils/TestUtils.java | 6 +- 5 files changed, 121 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index 0e50467..ef2ff2c 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -21,11 +21,11 @@ package org.apache.metron.profiler; import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheWriter; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Ticker; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; @@ -41,7 +41,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -105,7 +104,7 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab long periodDurationMillis, long profileTimeToLiveMillis, long maxNumberOfRoutes) { - this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker(), Optional.empty()); + this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker()); } /** @@ -116,14 +115,12 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser * used routes will be evicted from the internal cache. * @param ticker The ticker used to drive time for the caches. Only needs set for testing. - * @param cacheMaintenanceExecutor The executor responsible for running cache maintenance tasks. Only needed for testing. */ public DefaultMessageDistributor( long periodDurationMillis, long profileTimeToLiveMillis, long maxNumberOfRoutes, - Ticker ticker, - Optional cacheMaintenanceExecutor) { + Ticker ticker) { if(profileTimeToLiveMillis < periodDurationMillis) { throw new IllegalStateException(format( @@ -138,11 +135,8 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) - .removalListener(new ActiveCacheRemovalListener()) - .ticker(ticker); - if (cacheMaintenanceExecutor.isPresent()) { - activeCacheBuilder.executor(cacheMaintenanceExecutor.get()); - } + .ticker(ticker) + .writer(new ActiveCacheWriter()); if (LOG.isDebugEnabled()) { activeCacheBuilder.recordStats(); } @@ -153,11 +147,8 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) - .removalListener(new ExpiredCacheRemovalListener()) - .ticker(ticker); - if (cacheMaintenanceExecutor.isPresent()) { - expiredCacheBuilder.executor(cacheMaintenanceExecutor.get()); - } + .ticker(ticker) + .writer(new ExpiredCacheWriter()); if (LOG.isDebugEnabled()) { expiredCacheBuilder.recordStats(); } @@ -238,10 +229,12 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab */ private void cacheMaintenance() { activeCache.cleanUp(); + LOG.debug("Active cache maintenance triggered: cacheStats={}, size={}", + activeCache.stats().toString(), activeCache.estimatedSize()); + expiredCache.cleanUp(); - LOG.debug("Cache maintenance triggered: activeCacheStats={}, expiredCacheStats={}", - activeCache.stats().toString(), - expiredCache.stats().toString()); + LOG.debug("Expired cache maintenance triggered: cacheStats={}, size={}", + expiredCache.stats().toString(), expiredCache.estimatedSize()); } /** @@ -315,41 +308,51 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab } /** - * A listener that is notified when profiles expire from the active cache. + * Notified synchronously when the active cache is modified. */ - private class ActiveCacheRemovalListener implements RemovalListener, Serializable { + private class ActiveCacheWriter implements CacheWriter, Serializable { + + @Override + public void write(@Nonnull Integer key, @Nonnull ProfileBuilder value) { + // do nothing + } @Override - public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) { - LOG.warn("Profile expired from active cache; profile={}, entity={}", - expired.getDefinition().getProfile(), - expired.getEntity()); + public void delete(@Nonnull Integer key, @Nullable ProfileBuilder value, @Nonnull RemovalCause cause) { + if(cause.wasEvicted()) { + // add the profile to the expired cache + expiredCache.put(key, value); + LOG.debug("Profile expired from active cache due to inactivity; profile={}, entity={}, cause={}", + value.getDefinition().getProfile(), value.getEntity(), cause); - // add the profile to the expired cache - expiredCache.put(key, expired); + } else { + LOG.error("Profile removed from cache unexpectedly. File a bug report; profile={}, entity={}, cause={}", + value.getDefinition().getProfile(), value.getEntity(), cause); + } } } /** - * A listener that is notified when profiles expire from the active cache. + * Notified synchronously when the expired cache is modified. */ - private class ExpiredCacheRemovalListener implements RemovalListener, Serializable { + private class ExpiredCacheWriter implements CacheWriter, Serializable { + + @Override + public void write(@Nonnull Integer key, @Nonnull ProfileBuilder value) { + // nothing to do + } @Override - public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) { + public void delete(@Nonnull Integer key, @Nullable ProfileBuilder value, @Nonnull RemovalCause cause) { if(cause.wasEvicted()) { // the expired profile was NOT flushed in time LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}, cause={}", - expired.getDefinition().getProfile(), - expired.getEntity(), - cause); + value.getDefinition().getProfile(), value.getEntity(), cause); } else { // the expired profile was flushed successfully LOG.debug("Expired profile successfully flushed; profile={}, entity={}, cause={}", - expired.getDefinition().getProfile(), - expired.getEntity(), - cause); + value.getDefinition().getProfile(), value.getEntity(), cause); } } } http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java index d1b7598..e04404c 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -105,8 +105,7 @@ public class DefaultMessageDistributorTest { periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - Ticker.systemTicker(), - Optional.of(MoreExecutors.sameThreadExecutor())); + Ticker.systemTicker()); } /** @@ -199,8 +198,7 @@ public class DefaultMessageDistributorTest { periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - ticker, - Optional.of(MoreExecutors.sameThreadExecutor())); + ticker); // distribute one message distributor.distribute(route, context); @@ -230,8 +228,7 @@ public class DefaultMessageDistributorTest { periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - ticker, - Optional.of(MoreExecutors.sameThreadExecutor())); + ticker); // distribute one message distributor.distribute(route, context); @@ -262,8 +259,7 @@ public class DefaultMessageDistributorTest { periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - ticker, - Optional.of(MoreExecutors.sameThreadExecutor())); + ticker); // distribute one message distributor.distribute(route, context); http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-storm/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml index cfa30b3..ec2e215 100644 --- a/metron-analytics/metron-profiler-storm/pom.xml +++ b/metron-analytics/metron-profiler-storm/pom.xml @@ -36,6 +36,10 @@ org.slf4j slf4j-log4j12 + + org.apache.httpcomponents + httpclient + @@ -48,8 +52,8 @@ javax.servlet - commons-httpclient - commons-httpclient + org.apache.httpcomponents + httpclient org.slf4j @@ -143,6 +147,26 @@ + org.apache.metron + metron-integration-test + ${project.parent.version} + test + + + servlet-api + javax.servlet + + + org.apache.hadoop + hadoop-common + + + org.hamcrest + hamcrest-core + + + + com.esotericsoftware kryo-shaded ${global_kryo_version} @@ -271,15 +295,15 @@ org.mockito - mockito-all + mockito-core ${global_mockito_version} test - - - com.google.code.tempus-fugit - tempus-fugit - 1.2-20140129.191141-5 - test + + + org.hamcrest + hamcrest-core + + org.apache.metron http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java index f7e75ce..ea4ad4e 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java @@ -61,9 +61,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; -import static com.google.code.tempusfugit.temporal.Duration.seconds; -import static com.google.code.tempusfugit.temporal.Timeout.timeout; -import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; +import static org.apache.metron.integration.utils.TestUtils.assertEventually; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER; @@ -77,7 +75,10 @@ import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_ID_FIELD; import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_START_FIELD; import static org.apache.metron.profiler.storm.KafkaEmitter.PROFILE_FIELD; import static org.apache.metron.profiler.storm.KafkaEmitter.TIMESTAMP_FIELD; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.hasItems; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -88,6 +89,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler-storm/src/test"; private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml"; + private static final long timeout = TimeUnit.SECONDS.toMillis(90); public static final long startAt = 10; public static final String entity = "10.0.0.1"; @@ -205,32 +207,11 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { Thread.sleep(sleep); kafkaComponent.writeMessages(inputTopic, message3); - // retrieve the profile measurement using PROFILE_GET - String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))"; - List measurements = execute(profileGetExpression, List.class); - - // need to keep checking for measurements until the profiler has flushed one out - int attempt = 0; - while(measurements.size() == 0 && attempt++ < 10) { - - // wait for the profiler to flush - sleep = windowDurationMillis; - LOG.debug("Waiting {} millis for profiler to flush", sleep); - Thread.sleep(sleep); - - // do not write additional messages to advance time. this ensures that we are testing the "time to live" - // flush mechanism. the TTL setting defines when the profile will be flushed - - // try again to retrieve the profile measurement - measurements = execute(profileGetExpression, List.class); - } - - // expect to see only 1 measurement, but could be more (one for each period) depending on - // how long we waited for the flush to occur - assertTrue(measurements.size() > 0); - // the profile should have counted 3 messages; the 3 test messages that were sent - assertEquals(3, measurements.get(0).intValue()); + assertEventually(() -> { + List results = execute("PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))", List.class); + assertThat(results, hasItem(3)); + }, timeout); } /** @@ -278,25 +259,24 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { // wait until the profile flushes both periods. the first period will flush immediately as subsequent messages // advance time. the next period contains all of the remaining messages, so there are no other messages to // advance time. because of this the next period only flushes after the time-to-live expires - waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, timeout(seconds(90))); - { - // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1 - List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class); - assertEquals(14, results.get(0)); - assertEquals(12, results.get(1)); - } - { - // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158 - List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class); - assertEquals(36, results.get(0)); - assertEquals(38, results.get(1)); - } - { - // in all there are 50 messages in the first period and 50 messages in the next - List results = execute("PROFILE_GET('total-count', 'total', window)", List.class); - assertEquals(50, results.get(0)); - assertEquals(50, results.get(1)); - } + + // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1 + assertEventually(() -> { + List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class); + assertThat(results, hasItems(14, 12)); + }, timeout); + + // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158 + assertEventually(() -> { + List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class); + assertThat(results, hasItems(36, 38)); + }, timeout); + + // in all there are 50 (36+14) messages in the first period and 50 (38+12) messages in the next + assertEventually(() -> { + List results = execute("PROFILE_GET('total-count', 'total', window)", List.class); + assertThat(results, hasItems(50, 50)); + }, timeout); } /** @@ -332,18 +312,18 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { List messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); kafkaComponent.writeMessages(inputTopic, messages); - // wait until the profile is flushed - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); + assertEventually(() -> { + // validate the measurements written by the batch profiler using `PROFILE_GET` + // the 'window' looks up to 5 hours before the max timestamp contained in the test data + assign("maxTimestamp", "1530978728982L"); + assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); - // validate the measurements written by the batch profiler using `PROFILE_GET` - // the 'window' looks up to 5 hours before the max timestamp contained in the test data - assign("maxTimestamp", "1530978728982L"); - assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); + // retrieve the stats stored by the profiler + List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class); + assertTrue(results.size() > 0); + assertTrue(results.get(0) instanceof OnlineStatisticsProvider); - // retrieve the stats stored by the profiler - List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class); - assertTrue(results.size() > 0); - assertTrue(results.get(0) instanceof OnlineStatisticsProvider); + }, timeout); } /** @@ -371,6 +351,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Multiline private static String profileWithTriageResult; + private List outputMessages; @Test public void testProfileWithTriageResult() throws Exception { uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult)); @@ -381,10 +362,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { kafkaComponent.writeMessages(inputTopic, telemetry); // wait until the triage message is output to kafka - waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90))); - - List outputMessages = kafkaComponent.readMessages(outputTopic); - assertEquals(1, outputMessages.size()); + assertEventually(() -> { + outputMessages = kafkaComponent.readMessages(outputTopic); + assertEquals(1, outputMessages.size()); + }, timeout); // validate the triage message JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8")); http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java index 0c37a35..026c9fb 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java @@ -38,12 +38,12 @@ public class TestUtils { public interface Assertion { void apply() throws Exception; } + public static void assertEventually(Assertion assertion) throws Exception { assertEventually(assertion, MAX_ASSERT_WAIT_MS); } - private static void assertEventually(Assertion assertion - , long msToWait - ) throws Exception { + + public static void assertEventually(Assertion assertion, long msToWait) throws Exception { long delta = msToWait/10; for(int i = 0;i < 10;++i) { try{