From commits-return-2837-archive-asf-public=cust-asf.ponee.io@metron.apache.org Wed Apr 18 16:59:39 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 4D7011807CC for ; Wed, 18 Apr 2018 16:59:33 +0200 (CEST) Received: (qmail 93206 invoked by uid 500); 18 Apr 2018 14:59:32 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 92900 invoked by uid 99); 18 Apr 2018 14:59:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2018 14:59:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 59E4FF68EB; Wed, 18 Apr 2018 14:59:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: otto@apache.org To: commits@metron.apache.org Date: Wed, 18 Apr 2018 14:59:51 -0000 Message-Id: <7d1fdc9e49684162ba995da695cd3c9b@git.apache.org> In-Reply-To: <01a9dec83c064e4abd2c287d4c4baab7@git.apache.org> References: <01a9dec83c064e4abd2c287d4c4baab7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/52] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965 METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3083b471 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3083b471 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3083b471 Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: 3083b471fe912bc74d27017834e6c80ff177680e Parents: 46ad9d9 Author: nickwallen Authored: Tue Mar 20 16:00:20 2018 -0400 Committer: nickallen Committed: Tue Mar 20 16:00:20 2018 -0400 ---------------------------------------------------------------------- .../client/stellar/ProfilerFunctions.java | 14 +- .../profiler/DefaultMessageDistributor.java | 207 +++++++- .../metron/profiler/DefaultProfileBuilder.java | 110 ++-- .../metron/profiler/MessageDistributor.java | 48 +- .../apache/metron/profiler/MessageRoute.java | 19 +- .../apache/metron/profiler/MessageRouter.java | 11 +- .../apache/metron/profiler/ProfileBuilder.java | 34 +- .../metron/profiler/ProfileMeasurement.java | 6 +- .../metron/profiler/StandAloneProfiler.java | 100 +++- .../org/apache/metron/profiler/clock/Clock.java | 18 +- .../metron/profiler/clock/ClockFactory.java | 38 ++ .../profiler/clock/DefaultClockFactory.java | 57 ++ .../metron/profiler/clock/EventTimeClock.java | 72 +++ .../metron/profiler/clock/FixedClock.java | 39 +- .../profiler/clock/FixedClockFactory.java | 44 ++ .../apache/metron/profiler/clock/WallClock.java | 17 +- .../profiler/DefaultMessageDistributorTest.java | 171 +++++- .../profiler/DefaultProfileBuilderTest.java | 119 +++-- .../metron/profiler/ProfilePeriodTest.java | 1 - .../metron/profiler/StandAloneProfilerTest.java | 255 +++++++++ .../profiler/clock/DefaultClockFactoryTest.java | 75 +++ .../profiler/clock/EventTimeClockTest.java | 115 +++++ .../metron/profiler/clock/WallClockTest.java | 54 ++ metron-analytics/metron-profiler/README.md | 98 +++- .../src/main/config/profiler.properties | 14 +- .../src/main/flux/profiler/remote.yaml | 42 +- .../profiler/bolt/DestinationHandler.java | 56 -- .../bolt/FixedFrequencyFlushSignal.java | 126 +++++ .../metron/profiler/bolt/FlushSignal.java | 51 ++ .../profiler/bolt/HBaseDestinationHandler.java | 58 --- .../metron/profiler/bolt/HBaseEmitter.java | 63 +++ .../profiler/bolt/KafkaDestinationHandler.java | 110 ---- .../metron/profiler/bolt/KafkaEmitter.java | 114 ++++ .../metron/profiler/bolt/ManualFlushSignal.java | 54 ++ .../profiler/bolt/ProfileBuilderBolt.java | 374 +++++++++++--- .../bolt/ProfileMeasurementEmitter.java | 59 +++ .../profiler/bolt/ProfileSplitterBolt.java | 132 ++++- .../zookeeper/event-time-test/profiler.json | 12 + .../bolt/FixedFrequencyFlushSignalTest.java | 71 +++ .../bolt/KafkaDestinationHandlerTest.java | 203 -------- .../metron/profiler/bolt/KafkaEmitterTest.java | 208 ++++++++ .../profiler/bolt/ProfileBuilderBoltTest.java | 516 +++++++++++-------- .../profiler/bolt/ProfileHBaseMapperTest.java | 6 +- .../profiler/bolt/ProfileSplitterBoltTest.java | 288 +++++++++-- .../profiler/integration/MessageBuilder.java | 75 +++ .../integration/ProfilerIntegrationTest.java | 235 ++++++--- .../configuration/metron-profiler-env.xml | 77 ++- .../package/scripts/params/params_linux.py | 7 + .../package/templates/profiler.properties.j2 | 15 +- .../METRON/CURRENT/themes/metron_theme.json | 118 ++++- .../configuration/profiler/ProfileConfig.java | 53 ++ .../configuration/profiler/ProfilerConfig.java | 48 +- .../apache/metron/common/utils/JSONUtils.java | 11 +- .../configurations/ProfilerUpdater.java | 1 + .../profiler/ProfileConfigTest.java | 5 +- .../profiler/ProfilerConfigTest.java | 120 +++++ .../integration/components/KafkaComponent.java | 39 +- 57 files changed, 3987 insertions(+), 1096 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java index 64c1e2e..d6afe1d 100644 --- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java +++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java @@ -101,7 +101,10 @@ public class ProfilerFunctions { throw new IllegalArgumentException("Invalid profiler configuration", e); } - return new StandAloneProfiler(profilerConfig, periodDurationMillis, context); + // the TTL and max routes do not matter here + long profileTimeToLiveMillis = Long.MAX_VALUE; + long maxNumberOfRoutes = Long.MAX_VALUE; + return new StandAloneProfiler(profilerConfig, periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, context); } } @@ -138,13 +141,8 @@ public class ProfilerFunctions { // user must provide the stand alone profiler StandAloneProfiler profiler = Util.getArg(1, StandAloneProfiler.class, args); - try { - for (JSONObject message : messages) { - profiler.apply(message); - } - - } catch (ExecutionException e) { - throw new IllegalArgumentException(format("Failed to apply message; error=%s", e.getMessage()), e); + for (JSONObject message : messages) { + profiler.apply(message); } return profiler; http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index 53377a0..ea5126f 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -20,14 +20,20 @@ package org.apache.metron.profiler; +import com.google.common.base.Ticker; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.profiler.clock.WallClock; import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -36,32 +42,81 @@ import java.util.concurrent.TimeUnit; import static java.lang.String.format; /** - * Distributes a message along a MessageRoute. A MessageRoute will lead to one or - * more ProfileBuilders. + * The default implementation of a {@link MessageDistributor}. + * + *

Two caches are maintained; one for active profiles and another for expired + * profiles. A profile will remain on the active cache as long as it continues + * to receive messages. + * + *

If a profile has not received messages for an extended period of time, it + * is expired and moved to the expired cache. A profile that is expired can no + * longer receive new messages. + * + *

A profile is stored in the expired cache for a fixed period of time so that + * a client can flush the state of expired profiles. If the client does not flush + * the expired profiles using `flushExpired`, the state of these profiles will be + * lost. * - * A ProfileBuilder is responsible for maintaining the state of a single profile, - * for a single entity. There will be one ProfileBuilder for each (profile, entity) pair. - * This class ensures that each ProfileBuilder receives the telemetry messages that - * it needs. */ public class DefaultMessageDistributor implements MessageDistributor { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + /** * The duration of each profile period in milliseconds. */ private long periodDurationMillis; /** - * Maintains the state of a profile which is unique to a profile/entity pair. + * A cache of active profiles. + * + * A profile will remain on the active cache as long as it continues to receive + * messages. Once it has not received messages for a period of time, it is + * moved to the expired cache. + */ + private transient Cache activeCache; + + /** + * A cache of expired profiles. + * + * When a profile expires from the active cache, it is moved here for a + * period of time. In the expired cache a profile can no longer receive + * new messages. A profile waits on the expired cache so that the client + * can flush the state of the expired profile. If the client does not flush + * the expired profiles, this state will be lost forever. */ - private transient Cache profileCache; + private transient Cache expiredCache; /** * Create a new message distributor. + * + * @param periodDurationMillis The period duration in milliseconds. + * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds. + * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser + * used routes will be evicted from the internal cache. + */ + public DefaultMessageDistributor( + long periodDurationMillis, + long profileTimeToLiveMillis, + long maxNumberOfRoutes) { + this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker()); + } + + /** + * Create a new message distributor. + * * @param periodDurationMillis The period duration in milliseconds. - * @param profileTimeToLiveMillis The TTL of a profile in milliseconds. + * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds. + * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser + * used routes will be evicted from the internal cache. + * @param ticker The ticker used to drive time for the caches. Only needs set for testing. */ - public DefaultMessageDistributor(long periodDurationMillis, long profileTimeToLiveMillis) { + public DefaultMessageDistributor( + long periodDurationMillis, + long profileTimeToLiveMillis, + long maxNumberOfRoutes, + Ticker ticker) { + if(profileTimeToLiveMillis < periodDurationMillis) { throw new IllegalStateException(format( "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)", @@ -69,9 +124,23 @@ public class DefaultMessageDistributor implements MessageDistributor { periodDurationMillis)); } this.periodDurationMillis = periodDurationMillis; - this.profileCache = CacheBuilder + + // build the cache of active profiles + this.activeCache = CacheBuilder .newBuilder() + .maximumSize(maxNumberOfRoutes) .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) + .removalListener(new ActiveCacheRemovalListener()) + .ticker(ticker) + .build(); + + // build the cache of expired profiles + this.expiredCache = CacheBuilder + .newBuilder() + .maximumSize(maxNumberOfRoutes) + .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) + .removalListener(new ExpiredCacheRemovalListener()) + .ticker(ticker) .build(); } @@ -79,57 +148,120 @@ public class DefaultMessageDistributor implements MessageDistributor { * Distribute a message along a MessageRoute. * * @param message The message that needs distributed. + * @param timestamp The timestamp of the message. * @param route The message route. * @param context The Stellar execution context. * @throws ExecutionException */ @Override - public void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException { - getBuilder(route, context).apply(message); + public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) { + try { + ProfileBuilder builder = getBuilder(route, context); + builder.apply(message, timestamp); + + } catch(ExecutionException e) { + LOG.error("Unexpected error", e); + throw new RuntimeException(e); + } } /** - * Flushes all profiles. Flushes all ProfileBuilders that this distributor is responsible for. + * Flush all active profiles. + * + *

A profile will remain active as long as it continues to receive messages. If a profile + * does not receive a message for an extended duration, it may be marked as expired. + * + *

Flushes all active {@link ProfileBuilder} objects that this distributor is responsible for. * - * @return The profile measurements; one for each (profile, entity) pair. + * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair. */ @Override public List flush() { + + // cache maintenance needed here to ensure active profiles will expire + activeCache.cleanUp(); + expiredCache.cleanUp(); + + List measurements = flushCache(activeCache); + return measurements; + } + + /** + * Flush all expired profiles. + * + *

Flushes all expired {@link ProfileBuilder}s that this distributor is responsible for. + * + *

If a profile has not received messages for an extended period of time, it will be marked as + * expired. When a profile is expired, it can no longer receive new messages. Expired profiles + * remain only to give the client a chance to flush them. + * + *

If the client does not flush the expired profiles periodically, any state maintained in the + * profile since the last flush may be lost. + * + * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair. + */ + @Override + public List flushExpired() { + + // cache maintenance needed here to ensure active profiles will expire + activeCache.cleanUp(); + expiredCache.cleanUp(); + + // flush all expired profiles + List measurements = flushCache(expiredCache); + + // once the expired profiles have been flushed, they are no longer needed + expiredCache.invalidateAll(); + + return measurements; + } + + /** + * Flush all of the profiles maintained in a cache. + * + * @param cache The cache to flush. + * @return The measurements captured when flushing the profiles. + */ + private List flushCache(Cache cache) { + List measurements = new ArrayList<>(); + for(ProfileBuilder profileBuilder: cache.asMap().values()) { - profileCache.asMap().forEach((key, profileBuilder) -> { + // only need to flush, if the profile has been initialized if(profileBuilder.isInitialized()) { + + // flush the profiler and save the measurement, if one exists Optional measurement = profileBuilder.flush(); - measurement.ifPresent(measurements::add); + measurement.ifPresent(m -> measurements.add(m)); } - }); + } - profileCache.cleanUp(); return measurements; } /** * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile. If none exists, * one will be created and returned. + * * @param route The message route. * @param context The Stellar execution context. */ public ProfileBuilder getBuilder(MessageRoute route, Context context) throws ExecutionException { ProfileConfig profile = route.getProfileDefinition(); String entity = route.getEntity(); - return profileCache.get( + return activeCache.get( cacheKey(profile, entity), () -> new DefaultProfileBuilder.Builder() .withDefinition(profile) .withEntity(entity) .withPeriodDurationMillis(periodDurationMillis) .withContext(context) - .withClock(new WallClock()) .build()); } /** - * Builds the key that is used to lookup the ProfileState within the cache. + * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache. + * * @param profile The profile definition. * @param entity The entity. */ @@ -145,4 +277,33 @@ public class DefaultMessageDistributor implements MessageDistributor { public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units) { return withPeriodDurationMillis(units.toMillis(duration)); } + + /** + * A listener that is notified when profiles expire from the active cache. + */ + private class ActiveCacheRemovalListener implements RemovalListener { + + @Override + public void onRemoval(RemovalNotification notification) { + + String key = notification.getKey(); + ProfileBuilder expired = notification.getValue(); + + LOG.warn("Profile expired from active cache; key={}", key); + expiredCache.put(key, expired); + } + } + + /** + * A listener that is notified when profiles expire from the active cache. + */ + private class ExpiredCacheRemovalListener implements RemovalListener { + + @Override + public void onRemoval(RemovalNotification notification) { + + String key = notification.getKey(); + LOG.debug("Profile removed from expired cache; key={}", key); + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java index 2e34160..4b564c9 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java @@ -20,7 +20,18 @@ package org.apache.metron.profiler; -import static java.lang.String.format; +import org.apache.commons.collections4.ListUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; +import org.apache.metron.stellar.common.StellarStatefulExecutor; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.ParseException; +import org.apache.metron.stellar.dsl.StellarFunctions; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; import java.lang.invoke.MethodHandles; @@ -34,20 +45,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.commons.collections4.ListUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.profiler.clock.Clock; -import org.apache.metron.profiler.clock.WallClock; -import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; -import org.apache.metron.stellar.common.StellarStatefulExecutor; -import org.apache.metron.stellar.dsl.Context; -import org.apache.metron.stellar.dsl.ParseException; -import org.apache.metron.stellar.dsl.StellarFunctions; -import org.json.simple.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static java.lang.String.format; /** * Responsible for building and maintaining a Profile. @@ -94,16 +93,15 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { private long periodDurationMillis; /** - * A clock is used to tell time; imagine that. + * Tracks the latest timestamp for use when flushing the profile. */ - private Clock clock; + private long maxTimestamp; /** - * Use the ProfileBuilder.Builder to create a new ProfileBuilder. + * Private constructor. Use the {@link Builder} to create a new {@link ProfileBuilder). */ private DefaultProfileBuilder(ProfileConfig definition, String entity, - Clock clock, long periodDurationMillis, Context stellarContext) { @@ -111,27 +109,37 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { this.definition = definition; this.profileName = definition.getProfile(); this.entity = entity; - this.clock = clock; this.periodDurationMillis = periodDurationMillis; this.executor = new DefaultStellarStatefulExecutor(); StellarFunctions.initialize(stellarContext); this.executor.setContext(stellarContext); + this.maxTimestamp = 0; } /** * Apply a message to the profile. + * * @param message The message to apply. + * @param timestamp The timestamp of the message. */ @Override - public void apply(JSONObject message) { + public void apply(JSONObject message, long timestamp) { try { if (!isInitialized()) { + + // execute each 'init' expression assign(definition.getInit(), message, "init"); isInitialized = true; } + // execute each 'update' expression assign(definition.getUpdate(), message, "update"); + // keep track of the 'latest' timestamp seen for use when flushing the profile + if(timestamp > maxTimestamp) { + maxTimestamp = timestamp; + } + } catch(Throwable e) { LOG.error(format("Unable to apply message to profile: %s", e.getMessage()), e); } @@ -140,23 +148,30 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { /** * Flush the Profile. * - * Completes and emits the ProfileMeasurement. Clears all state in preparation for + *

Completes and emits the {@link ProfileMeasurement}. Clears all state in preparation for * the next window period. * - * @return Returns the completed profile measurement. + * @return Returns the completed {@link ProfileMeasurement}. */ @Override public Optional flush() { - LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity); - Optional result = Optional.empty(); - ProfilePeriod period = new ProfilePeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS); + + Optional result; + ProfilePeriod period = new ProfilePeriod(maxTimestamp, periodDurationMillis, TimeUnit.MILLISECONDS); try { - // execute the 'profile' expression(s) - Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile"); + // execute the 'profile' expression + String profileExpression = definition + .getResult() + .getProfileExpressions() + .getExpression(); + Object profileValue = execute(profileExpression, "result/profile"); // execute the 'triage' expression(s) - Map triageValues = definition.getResult().getTriageExpressions().getExpressions() + Map triageValues = definition + .getResult() + .getTriageExpressions() + .getExpressions() .entrySet() .stream() .collect(Collectors.toMap( @@ -185,10 +200,21 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { .withDefinition(definition)); } catch(Throwable e) { + // if any of the Stellar expressions fail, a measurement should NOT be returned LOG.error(format("Unable to flush profile: error=%s", e.getMessage()), e); + result = Optional.empty(); } + LOG.debug("Flushed profile: profile={}, entity={}, maxTime={}, period={}, start={}, end={}, duration={}", + profileName, + entity, + maxTimestamp, + period.getPeriod(), + period.getStartTimeMillis(), + period.getEndTimeMillis(), + period.getDurationMillis()); + isInitialized = false; return result; } @@ -214,6 +240,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { /** * Executes an expression contained within the profile definition. + * * @param expression The expression to execute. * @param transientState Additional transient state provided to the expression. * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. @@ -232,6 +259,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { /** * Executes an expression contained within the profile definition. + * * @param expression The expression to execute. * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. * @return The result of executing the expression. @@ -242,6 +270,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { /** * Executes a set of expressions whose results need to be assigned to a variable. + * * @param expressions Maps the name of a variable to the expression whose result should be assigned to it. * @param transientState Additional transient state provided to the expression. * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. @@ -254,6 +283,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { String expr = entry.getValue(); try { + // assign the result of the expression to the variable executor.assign(var, expr, transientState); @@ -274,6 +304,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { /** * Executes the expressions contained within the profile definition. + * * @param expressions A list of expressions to execute. * @param transientState Additional transient state provided to the expressions. * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails. @@ -284,6 +315,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { for(String expr: ListUtils.emptyIfNull(expressions)) { try { + // execute an expression Object result = executor.execute(expr, transientState, Object.class); results.add(result); @@ -305,15 +337,19 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { return results; } + @Override + public String getEntity() { + return entity; + } + /** - * A builder used to construct a new ProfileBuilder. + * A builder should be used to construct a new {@link ProfileBuilder} object. */ public static class Builder { private ProfileConfig definition; private String entity; - private long periodDurationMillis; - private Clock clock = new WallClock(); + private Long periodDurationMillis; private Context context; public Builder withContext(Context context) { @@ -321,11 +357,6 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { return this; } - public Builder withClock(Clock clock) { - this.clock = clock; - return this; - } - /** * @param definition The profiler definition. */ @@ -370,8 +401,11 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { if(StringUtils.isEmpty(entity)) { throw new IllegalArgumentException(format("missing entity name; got '%s'", entity)); } + if(periodDurationMillis == null) { + throw new IllegalArgumentException("missing period duration"); + } - return new DefaultProfileBuilder(definition, entity, clock, periodDurationMillis, context); + return new DefaultProfileBuilder(definition, entity, periodDurationMillis, context); } } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java index a60446f..ea5be0f 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java @@ -24,33 +24,57 @@ import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; import java.util.List; -import java.util.concurrent.ExecutionException; /** - * Distributes a message along a MessageRoute. A MessageRoute will lead to one or - * more ProfileBuilders. + * Distributes a telemetry message along a {@link MessageRoute}. A {@link MessageRoute} will lead to a + * {@link ProfileBuilder} that is responsible for building and maintaining a profile. * - * A ProfileBuilder is responsible for maintaining the state of a single profile, - * for a single entity. There will be one ProfileBuilder for each (profile, entity) pair. - * This class ensures that each ProfileBuilder receives the telemetry messages that - * it needs. + *

A {@link ProfileBuilder} is responsible for maintaining the state of a single (profile, entity) + * pairing. There will be one {@link ProfileBuilder} for each (profile, entity) pair. + * + *

A {@link MessageDistributor} ensures that each {@link ProfileBuilder} receives the telemetry + * messages that it needs. + * + * @see MessageRoute + * @see ProfileMeasurement */ public interface MessageDistributor { /** - * Distribute a message along a MessageRoute. + * Distribute a message along a {@link MessageRoute}. * * @param message The message that needs distributed. + * @param timestamp The timestamp of the message. * @param route The message route. * @param context The Stellar execution context. - * @throws ExecutionException */ - void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException; + void distribute(JSONObject message, long timestamp, MessageRoute route, Context context); /** - * Flushes all profiles. Flushes all ProfileBuilders that this distributor is responsible for. + * Flush all active profiles. + * + *

A profile will remain active as long as it continues to receive messages. If a profile + * does not receive a message for an extended duration, it may be marked as expired. * - * @return The profile measurements; one for each (profile, entity) pair. + *

Flushes all active {@link ProfileBuilder} objects that this distributor is responsible for. + * + * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair. */ List flush(); + + /** + * Flush all expired profiles. + * + *

If a profile has not received messages for an extended period of time, it will be marked as + * expired. When a profile is expired, it can no longer receive new messages. Expired profiles + * remain only to give the client a chance to flush them. + * + *

If the client does not flush the expired profiles periodically, any state maintained in the + * profile since the last flush may be lost. + * + *

Flushes all expired {@link ProfileBuilder} objects that this distributor is responsible for. + * + * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair. + */ + List flushExpired(); } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java index 1945671..7288f03 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java @@ -23,12 +23,15 @@ package org.apache.metron.profiler; import org.apache.metron.common.configuration.profiler.ProfileConfig; /** - * A MessageRoute defines the profile and entity that a telemetry message needs applied to. This - * allows a message to be routed to the profile and entity that needs it. + * Defines the 'route' a message must take through the Profiler. * - * One telemetry message may need multiple routes. This is the case when a message is needed by - * more than one profile. In this case, there will be multiple MessageRoute objects for a single - * message. + *

A {@link MessageRoute} defines the profile and entity that a telemetry message needs applied to. + * + *

If a message is needed by multiple profiles, then multiple {@link MessageRoute} values + * will exist. If a message is not needed by any profiles, then no {@link MessageRoute} values + * will exist. + * + * @see MessageRouter */ public class MessageRoute { @@ -42,6 +45,12 @@ public class MessageRoute { */ private String entity; + /** + * Create a {@link MessageRoute}. + * + * @param profileDefinition The profile definition. + * @param entity The entity. + */ public MessageRoute(ProfileConfig profileDefinition, String entity) { this.entity = entity; this.profileDefinition = profileDefinition; http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java index 99c98a3..4c18062 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java @@ -27,15 +27,18 @@ import org.json.simple.JSONObject; import java.util.List; /** - * Routes incoming telemetry messages. + * Routes incoming telemetry messages through the Profiler. * - * A single telemetry message may need to take multiple routes. This is the case - * when a message is needed by more than one profile. + *

If a message is needed by multiple profiles, then multiple {@link MessageRoute} values + * will be returned. If a message is not needed by any profiles, then no {@link MessageRoute} values + * will be returned. + * + * @see MessageRoute */ public interface MessageRouter { /** - * Route a telemetry message. Finds all routes for a given telemetry message. + * Finds all routes for a telemetry message. * * @param message The telemetry message that needs routed. * @param config The configuration for the Profiler. http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java index c09b0b6..07372d7 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -28,47 +28,61 @@ import java.util.Optional; /** * Responsible for building and maintaining a Profile. * - * One or more messages are applied to the Profile with `apply` and a profile measurement is - * produced by calling `flush`. + *

Telemetry messages are applied to a profile using {@link ProfileBuilder#apply(JSONObject, long)}. A + * {@link ProfileMeasurement} is generated by calling {@link ProfileBuilder#flush()}. * - * Any one instance is responsible only for building the profile for a specific [profile, entity] - * pairing. There will exist many instances, one for each [profile, entity] pair that exists + * A {@link ProfileBuilder} is responsible only for building the profile for a specific [profile, entity] + * pair. There will exist many instances, one for each [profile, entity] pair that exists * within the incoming telemetry data applied to the profile. */ public interface ProfileBuilder { /** * Apply a message to the profile. + * * @param message The message to apply. + * @param timestamp The timestamp of the message. */ - void apply(JSONObject message); + void apply(JSONObject message, long timestamp); /** * Flush the Profile. * - * Completes and emits the ProfileMeasurement. Clears all state in preparation for + *

Completes the period and returns the {@link ProfileMeasurement}. Clears all state in preparation for * the next window period. * - * @return Returns the completed profile measurement. + * @return Returns the {@link ProfileMeasurement}. */ Optional flush(); /** - * Has the ProfileBuilder been initialized? + * Has the {@link ProfileBuilder} been initialized? + * * @return True, if initialization has occurred. False, otherwise. */ boolean isInitialized(); /** * Returns the definition of the profile being built. - * @return ProfileConfig definition of the profile + * + * @return The profile definition. */ ProfileConfig getDefinition(); /** - * Returns the value of a variable being maintained by the builder. + * Returns the value of a variable within the current profile state. + * * @param variable The variable name. * @return The value of the variable. */ Object valueOf(String variable); + + /** + * Returns the name of the entity. + * + *

Each {@code ProfileBuilder} instance is responsible for one (profile, entity) pair. + * + * @return The entity. + */ + String getEntity(); } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java index 0e773e9..f6cc286 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java @@ -28,10 +28,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; /** - * Represents a single data point within a Profile. + * Represents a single data point within a profile. * - * A Profile is effectively a time series. To this end a Profile is composed - * of many ProfileMeasurement values which in aggregate form a time series. + *

A profile contains many individual {@link ProfileMeasurement} values captured over a + * period of time. These values in aggregate form a time series. */ public class ProfileMeasurement { http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java index 6db7079..f79efe6 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java @@ -21,18 +21,29 @@ package org.apache.metron.profiler; import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.clock.Clock; +import org.apache.metron.profiler.clock.ClockFactory; +import org.apache.metron.profiler.clock.DefaultClockFactory; import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; /** - * A stand alone version of the Profiler that does not require a - * distributed execution environment like Apache Storm. + * A stand alone version of the Profiler that does not require a distributed + * execution environment like Apache Storm. + * + *

This class is used to create and manage profiles within the REPL environment. */ public class StandAloneProfiler { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + /** * The Stellar execution context. */ @@ -54,6 +65,11 @@ public class StandAloneProfiler { private MessageDistributor distributor; /** + * The factory that creates Clock objects. + */ + private ClockFactory clockFactory; + + /** * Counts the number of messages that have been applied. */ private int messageCount; @@ -67,12 +83,26 @@ public class StandAloneProfiler { */ private int routeCount; - public StandAloneProfiler(ProfilerConfig config, long periodDurationMillis, Context context) { + /** + * Create a new Profiler. + * + * @param config The Profiler configuration. + * @param periodDurationMillis The period duration in milliseconds. + * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds. + * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser + * used routes will be evicted from the internal cache. + * @param context The Stellar execution context. + */ + public StandAloneProfiler(ProfilerConfig config, + long periodDurationMillis, + long profileTimeToLiveMillis, + long maxNumberOfRoutes, + Context context) { this.context = context; this.config = config; this.router = new DefaultMessageRouter(context); - // the period TTL does not matter in this context - this.distributor = new DefaultMessageDistributor(periodDurationMillis, Long.MAX_VALUE); + this.distributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes); + this.clockFactory = new DefaultClockFactory(); this.messageCount = 0; this.routeCount = 0; } @@ -80,26 +110,28 @@ public class StandAloneProfiler { /** * Apply a message to a set of profiles. * @param message The message to apply. - * @throws ExecutionException */ - public void apply(JSONObject message) throws ExecutionException { + public void apply(JSONObject message) { - List routes = router.route(message, config, context); - for(MessageRoute route : routes) { - distributor.distribute(message, route, context); - } + // what time is it? + Clock clock = clockFactory.createClock(config); + Optional timestamp = clock.currentTimeMillis(message); - routeCount += routes.size(); - messageCount += 1; - } + // can only route the message, if we have a timestamp + if(timestamp.isPresent()) { - @Override - public String toString() { - return "Profiler{" + - getProfileCount() + " profile(s), " + - getMessageCount() + " messages(s), " + - getRouteCount() + " route(s)" + - '}'; + // route the message to the correct profile builders + List routes = router.route(message, config, context); + for (MessageRoute route : routes) { + distributor.distribute(message, timestamp.get(), route, context); + } + + routeCount += routes.size(); + messageCount += 1; + + } else { + LOG.warn("No timestamp available for the message. The message will be ignored."); + } } /** @@ -110,19 +142,45 @@ public class StandAloneProfiler { return distributor.flush(); } + /** + * Returns the Profiler configuration. + * @return The Profiler configuration. + */ public ProfilerConfig getConfig() { return config; } + /** + * Returns the number of defined profiles. + * @return The number of defined profiles. + */ public int getProfileCount() { return (config == null) ? 0: config.getProfiles().size(); } + /** + * Returns the number of messages that have been applied. + * @return The number of messages that have been applied. + */ public int getMessageCount() { return messageCount; } + /** + * Returns the number of routes. + * @return The number of routes. + * @see MessageRoute + */ public int getRouteCount() { return routeCount; } + + @Override + public String toString() { + return "Profiler{" + + getProfileCount() + " profile(s), " + + getMessageCount() + " messages(s), " + + getRouteCount() + " route(s)" + + '}'; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java index 6730e49..b07c0ed 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java @@ -20,16 +20,24 @@ package org.apache.metron.profiler.clock; +import org.json.simple.JSONObject; + +import java.util.Optional; + /** - * A clock can tell time; imagine that. + * A {@link Clock} manages the progression of time in the Profiler. * - * This allows the Profiler to support different treatments of time like wall clock versus event time. + *

The Profiler can operate on either processing time or event time. This + * abstraction deals with the differences between the two. */ public interface Clock { /** - * The current time in epoch milliseconds. + * Returns the current time in epoch milliseconds. + * + * @param message The telemetry message. + * @return An optional value containing the current time in epoch milliseconds, if + * the current time is known. Otherwise, empty. */ - long currentTimeMillis(); - + Optional currentTimeMillis(JSONObject message); } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java new file mode 100644 index 0000000..5435c48 --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.clock; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; + +/** + * A factory for creating {@link Clock} objects. + * + * The type of {@link Clock} needed will depend on the Profiler configuration. + */ +public interface ClockFactory { + + /** + * Creates and returns a {@link Clock}. + * + * @param config The profiler configuration. + * @return A {@link Clock}. + */ + Clock createClock(ProfilerConfig config); +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java new file mode 100644 index 0000000..d62e62b --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.clock; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; + +/** + * Creates a {@link Clock} based on the profiler configuration. + * + *

If the Profiler is configured to use event time, a {@link EventTimeClock} will + * be created. Otherwise, a {@link WallClock} will be created. + * + *

The default implementation of a {@link ClockFactory}. + */ +public class DefaultClockFactory implements ClockFactory { + + /** + * @param config The profiler configuration. + * @return The appropriate Clock based on the profiler configuration. + */ + @Override + public Clock createClock(ProfilerConfig config) { + Clock clock; + + boolean isEventTime = config.getTimestampField().isPresent(); + if(isEventTime) { + + // using event time + String timestampField = config.getTimestampField().get(); + clock = new EventTimeClock(timestampField); + + } else { + + // using processing time + clock = new WallClock(); + } + + return clock; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java new file mode 100644 index 0000000..5cd574e --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.clock; + +import org.apache.metron.stellar.common.utils.ConversionUtils; +import org.json.simple.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Optional; + +/** + * A {@link Clock} that advances based on event time. + * + * Event time is advanced by the timestamps contained within telemetry messages, rather + * than the system clock. + */ +public class EventTimeClock implements Clock { + + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The name of the field from which the timestamp will + */ + private String timestampField; + + /** + * @param timestampField The name of the field containing a timestamp. + */ + public EventTimeClock(String timestampField) { + this.timestampField = timestampField; + } + + @Override + public Optional currentTimeMillis(JSONObject message) { + + Long result; + if(message != null && message.containsKey(timestampField)) { + + // extract the timestamp and convert to a long + Object timestamp = message.get(timestampField); + result = ConversionUtils.convert(timestamp, Long.class); + + } else { + + // the message does not contain the specified timestamp field + LOG.debug("message does not contain timestamp field '{}': message will be ignored: message='{}'", + timestampField, JSONObject.toJSONString(message)); + result = null; + } + + return Optional.ofNullable(result); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java index c6e93cd..8259ed0 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java @@ -20,21 +20,50 @@ package org.apache.metron.profiler.clock; -import java.io.Serializable; +import org.json.simple.JSONObject; + +import java.util.Optional; /** - * A clock that reports whatever time you tell it to. Most useful for testing. + * A {@link Clock} that always reports the same time. + * + *

This is only useful for testing. */ -public class FixedClock implements Clock, Serializable { +public class FixedClock implements Clock { + /** + * The time in milliseconds since the epoch. + */ private long epochMillis; + /** + * Create a {@link Clock}. The time defaults to the epoch. + */ + public FixedClock() { + this(0); + } + + /** + * Create a {@link Clock}. + * @param epochMillis The time in milliseconds since the epoch. + */ + public FixedClock(long epochMillis) { + this.setTime(epochMillis); + } + + /** + * Set the current time. + * @param epochMillis The time in milliseconds since the epoch. + */ public void setTime(long epochMillis) { this.epochMillis = epochMillis; } + /** + * @return The time in milliseconds since the epoch. + */ @Override - public long currentTimeMillis() { - return this.epochMillis; + public Optional currentTimeMillis(JSONObject message) { + return Optional.of(this.epochMillis); } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java new file mode 100644 index 0000000..b0248cd --- /dev/null +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.clock; + +import org.apache.metron.common.configuration.profiler.ProfilerConfig; + +/** + * A {@link ClockFactory} that always returns a {@link FixedClock}. + * + *

A {@link FixedClock} always returns the same time and is only useful for testing. + */ +public class FixedClockFactory implements ClockFactory { + + private long timestamp; + + /** + * @param timestamp The timestamp that all {@link Clock} objects created by this factory will report. + */ + public FixedClockFactory(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public Clock createClock(ProfilerConfig config) { + return new FixedClock(timestamp); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java index 1a20c94..20f62e3 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java @@ -20,15 +20,22 @@ package org.apache.metron.profiler.clock; -import java.io.Serializable; +import org.json.simple.JSONObject; + +import java.util.Optional; /** - * A clock that uses the system clock to provide wall clock time. + * A {@link Clock} that advances based on system time. + * + *

This {@link Clock} is used to advance time when the Profiler is running + * on processing time, rather than event time. */ -public class WallClock implements Clock, Serializable { +public class WallClock implements Clock { @Override - public long currentTimeMillis() { - return System.currentTimeMillis(); + public Optional currentTimeMillis(JSONObject message) { + + // the message does not matter; use system time + return Optional.of(System.currentTimeMillis()); } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java index ff4c289..ea9c5c6 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -20,6 +20,7 @@ package org.apache.metron.profiler; +import com.google.common.base.Ticker; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; @@ -33,6 +34,9 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.junit.Assert.assertEquals; public class DefaultMessageDistributorTest { @@ -83,16 +87,22 @@ public class DefaultMessageDistributorTest { private DefaultMessageDistributor distributor; private Context context; + private long periodDurationMillis = MINUTES.toMillis(15); + private long profileTimeToLiveMillis = MINUTES.toMillis(30); + private long maxNumberOfRoutes = Long.MAX_VALUE; @Before public void setup() throws Exception { + context = Context.EMPTY_CONTEXT(); JSONParser parser = new JSONParser(); messageOne = (JSONObject) parser.parse(inputOne); messageTwo = (JSONObject) parser.parse(inputTwo); + distributor = new DefaultMessageDistributor( - TimeUnit.MINUTES.toMillis(15), - TimeUnit.MINUTES.toMillis(30)); + periodDurationMillis, + profileTimeToLiveMillis, + maxNumberOfRoutes); } /** @@ -108,15 +118,18 @@ public class DefaultMessageDistributorTest { */ @Test public void testDistribute() throws Exception { + + // setup + long timestamp = 100; ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); MessageRoute route = new MessageRoute(definition, entity); - // distribute one message - distributor.distribute(messageOne, route, context); + // distribute one message and flush + distributor.distribute(messageOne, timestamp, route, context); + List measurements = distributor.flush(); // expect one measurement coming from one profile - List measurements = distributor.flush(); assertEquals(1, measurements.size()); ProfileMeasurement m = measurements.get(0); assertEquals(definition.getProfile(), m.getProfileName()); @@ -126,12 +139,17 @@ public class DefaultMessageDistributorTest { @Test public void testDistributeWithTwoProfiles() throws Exception { - // distribute one message to the first profile + // setup + long timestamp = 100; String entity = (String) messageOne.get("ip_src_addr"); - distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entity), context); + + // distribute one message to the first profile + MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity); + distributor.distribute(messageOne, timestamp, routeOne, context); // distribute another message to the second profile, but same entity - distributor.distribute(messageOne, new MessageRoute(createDefinition(profileTwo), entity), context); + MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity); + distributor.distribute(messageOne, timestamp, routeTwo, context); // expect 2 measurements; 1 for each profile List measurements = distributor.flush(); @@ -141,17 +159,150 @@ public class DefaultMessageDistributorTest { @Test public void testDistributeWithTwoEntities() throws Exception { + // setup + long timestamp = 100; + // distribute one message String entityOne = (String) messageOne.get("ip_src_addr"); - distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entityOne), context); + MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne); + distributor.distribute(messageOne, timestamp, routeOne, context); // distribute another message with a different entity String entityTwo = (String) messageTwo.get("ip_src_addr"); - distributor.distribute(messageTwo, new MessageRoute(createDefinition(profileTwo), entityTwo), context); + MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo); + distributor.distribute(messageTwo, timestamp, routeTwo, context); // expect 2 measurements; 1 for each entity List measurements = distributor.flush(); assertEquals(2, measurements.size()); } + /** + * A profile should expire after a fixed period of time. This test ensures that + * profiles are not expired before they are supposed to be. + */ + @Test + public void testNotYetTimeToExpireProfiles() throws Exception { + + // the ticker drives time to allow us to test cache expiration + FixedTicker ticker = new FixedTicker(); + + // setup + ProfileConfig definition = createDefinition(profileOne); + String entity = (String) messageOne.get("ip_src_addr"); + MessageRoute route = new MessageRoute(definition, entity); + distributor = new DefaultMessageDistributor( + periodDurationMillis, + profileTimeToLiveMillis, + maxNumberOfRoutes, + ticker); + + // distribute one message + distributor.distribute(messageOne, 1000000, route, context); + + // advance time to just shy of the profile TTL + ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS); + + // the profile should NOT have expired yet + assertEquals(0, distributor.flushExpired().size()); + assertEquals(1, distributor.flush().size()); + } + + /** + * A profile should expire after a fixed period of time. + */ + @Test + public void testProfilesShouldExpire() throws Exception { + + // the ticker drives time to allow us to test cache expiration + FixedTicker ticker = new FixedTicker(); + + // setup + ProfileConfig definition = createDefinition(profileOne); + String entity = (String) messageOne.get("ip_src_addr"); + MessageRoute route = new MessageRoute(definition, entity); + distributor = new DefaultMessageDistributor( + periodDurationMillis, + profileTimeToLiveMillis, + maxNumberOfRoutes, + ticker); + + // distribute one message + distributor.distribute(messageOne, 100000, route, context); + + // advance time to just beyond the period duration + ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS); + + // the profile should have expired by now + assertEquals(1, distributor.flushExpired().size()); + assertEquals(0, distributor.flush().size()); + } + + /** + * An expired profile is only kept around for a fixed period of time. It should be removed, if it + * has been on the expired cache for too long. + */ + @Test + public void testExpiredProfilesShouldBeRemoved() throws Exception { + + // the ticker drives time to allow us to test cache expiration + FixedTicker ticker = new FixedTicker(); + + // setup + ProfileConfig definition = createDefinition(profileOne); + String entity = (String) messageOne.get("ip_src_addr"); + MessageRoute route = new MessageRoute(definition, entity); + distributor = new DefaultMessageDistributor( + periodDurationMillis, + profileTimeToLiveMillis, + maxNumberOfRoutes, + ticker); + + // distribute one message + distributor.distribute(messageOne, 1000000, route, context); + + // advance time a couple of hours + ticker.advanceTime(2, HOURS); + + // the profile should have been expired + assertEquals(0, distributor.flush().size()); + + // advance time a couple of hours + ticker.advanceTime(2, HOURS); + + // the profile should have been removed from the expired cache + assertEquals(0, distributor.flushExpired().size()); + } + + /** + * An implementation of Ticker that can be used to drive time + * when testing the Guava caches. + */ + private class FixedTicker extends Ticker { + + /** + * The time that will be reported. + */ + private long timestampNanos; + + public FixedTicker() { + this.timestampNanos = Ticker.systemTicker().read(); + } + + public FixedTicker startAt(long timestampNanos) { + this.timestampNanos = timestampNanos; + return this; + } + + public FixedTicker advanceTime(long time, TimeUnit units) { + this.timestampNanos += units.toNanos(time); + return this; + } + + @Override + public long read() { + return this.timestampNanos; + } + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java index d25b7ff..24eb5f8 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java @@ -23,8 +23,6 @@ package org.apache.metron.profiler; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.profiler.clock.Clock; -import org.apache.metron.profiler.clock.FixedClock; import org.apache.metron.stellar.dsl.Context; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -82,7 +80,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testInit() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -92,7 +92,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -106,7 +106,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testInitWithNoMessage() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -146,7 +148,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testUpdate() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -158,7 +162,12 @@ public class DefaultProfileBuilderTest { // execute int count = 10; for(int i=0; i m = builder.flush(); assertTrue(m.isPresent()); @@ -183,7 +192,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testResult() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -193,7 +204,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -206,40 +217,38 @@ public class DefaultProfileBuilderTest { */ @Test public void testProfilePeriodOnFlush() throws Exception { - // setup - FixedClock clock = new FixedClock(); - clock.setTime(100); + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") .withPeriodDuration(10, TimeUnit.MINUTES) .withContext(Context.EMPTY_CONTEXT()) - .withClock(clock) .build(); { // apply a message and flush - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); // validate the profile period - ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); + ProfilePeriod expected = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES); assertEquals(expected, m.get().getPeriod()); } { - // advance time by at least one period - 10 minutes - clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10)); + // advance time by at least one period... about 10 minutes + timestamp += TimeUnit.MINUTES.toMillis(10); // apply a message and flush again - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); // validate the profile period - ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); + ProfilePeriod expected = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES); assertEquals(expected, m.get().getPeriod()); } } @@ -262,7 +271,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testGroupBy() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -272,7 +283,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -300,23 +311,20 @@ public class DefaultProfileBuilderTest { */ @Test public void testStateAvailableToGroupBy() throws Exception { - FixedClock clock = new FixedClock(); - clock.setTime(1503081070340L); - long periodDurationMillis = TimeUnit.MINUTES.toMillis(10); - ProfilePeriod period = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); // setup + long timestamp = 1503081070340L; + ProfilePeriod period = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES); definition = JSONUtils.INSTANCE.load(testStateAvailableToGroupBy, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) .withEntity("10.0.0.1") .withPeriodDuration(10, TimeUnit.MINUTES) .withContext(Context.EMPTY_CONTEXT()) - .withClock(clock) .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -350,7 +358,9 @@ public class DefaultProfileBuilderTest { @Test public void testFlushDoesNotClearsState() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -362,16 +372,24 @@ public class DefaultProfileBuilderTest { // execute - accumulate some state then flush it int count = 10; for(int i=0; i m = builder.flush(); - assertTrue(m.isPresent()); // validate + assertTrue(m.isPresent()); assertEquals(33, m.get().getProfileValue()); } @@ -395,7 +413,9 @@ public class DefaultProfileBuilderTest { @Test public void testFlushDoesNotClearsStateButInitDoes() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -407,18 +427,27 @@ public class DefaultProfileBuilderTest { // execute - accumulate some state then flush it int count = 10; for(int i=0; i m = builder.flush(); assertTrue(m.isPresent()); // validate assertEquals(3, m.get().getProfileValue()); } + /** * { * "profile": "test", @@ -434,7 +463,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testEntity() throws Exception { + // setup + long timestamp = 100; final String entity = "10.0.0.1"; definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() @@ -445,7 +476,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -473,7 +504,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testResultWithProfileExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -483,7 +516,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -515,7 +548,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testResultWithTriageExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -525,7 +560,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); Optional m = builder.flush(); assertTrue(m.isPresent()); @@ -550,7 +585,9 @@ public class DefaultProfileBuilderTest { @Test public void testBadInitExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(badInitProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -560,7 +597,7 @@ public class DefaultProfileBuilderTest { .build(); // due to the bad expression, there should be no result - builder.apply(message); + builder.apply(message, timestamp); assertFalse(builder.flush().isPresent()); } @@ -579,7 +616,9 @@ public class DefaultProfileBuilderTest { @Test public void testBadResultExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(badSimpleResultProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -589,7 +628,7 @@ public class DefaultProfileBuilderTest { .build(); // due to the bad expression, there should be no result - builder.apply(message); + builder.apply(message, timestamp); assertFalse(builder.flush().isPresent()); } @@ -608,7 +647,9 @@ public class DefaultProfileBuilderTest { @Test public void testBadGroupByExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(badGroupByProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -618,7 +659,7 @@ public class DefaultProfileBuilderTest { .build(); // due to the bad expression, there should be no result - builder.apply(message); + builder.apply(message, timestamp); assertFalse(builder.flush().isPresent()); } @@ -641,7 +682,9 @@ public class DefaultProfileBuilderTest { @Test public void testBadResultProfileExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(badResultProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -651,7 +694,7 @@ public class DefaultProfileBuilderTest { .build(); // due to the bad expression, there should be no result - builder.apply(message); + builder.apply(message, timestamp); assertFalse(builder.flush().isPresent()); } @@ -674,7 +717,9 @@ public class DefaultProfileBuilderTest { @Test public void testBadResultTriageExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(badResultTriage, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -684,7 +729,7 @@ public class DefaultProfileBuilderTest { .build(); // due to the bad expression, there should be no result - builder.apply(message); + builder.apply(message, timestamp); assertFalse(builder.flush().isPresent()); } @@ -707,7 +752,9 @@ public class DefaultProfileBuilderTest { */ @Test public void testBadUpdateExpression() throws Exception { + // setup + long timestamp = 100; definition = JSONUtils.INSTANCE.load(badUpdateProfile, ProfileConfig.class); builder = new DefaultProfileBuilder.Builder() .withDefinition(definition) @@ -717,7 +764,7 @@ public class DefaultProfileBuilderTest { .build(); // execute - builder.apply(message); + builder.apply(message, timestamp); // if the update expression fails, the profile should still flush. Optional m = builder.flush(); http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java index 3a51ea4..1a72111 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java @@ -20,7 +20,6 @@ package org.apache.metron.profiler; -import org.apache.metron.profiler.ProfilePeriod; import org.junit.Test; import java.util.concurrent.TimeUnit;