metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject metron git commit: METRON-1544 Flaky test: org.apache.metron.stellar.common.CachingStellarProcessorTest#testCaching (nickwallen) closes apache/metron#1015
Date Tue, 29 May 2018 18:26:21 GMT
Repository: metron
Updated Branches:
  refs/heads/master 5303c5ead -> c14c78692


METRON-1544 Flaky test: org.apache.metron.stellar.common.CachingStellarProcessorTest#testCaching
(nickwallen) closes apache/metron#1015


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c14c7869
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c14c7869
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c14c7869

Branch: refs/heads/master
Commit: c14c7869228132ad6e6cae8b9914f669ac6b489d
Parents: 5303c5e
Author: nickwallen <nick@nickallen.org>
Authored: Tue May 29 14:25:55 2018 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Tue May 29 14:25:55 2018 -0400

----------------------------------------------------------------------
 .../stellar/common/CachingStellarProcessor.java | 141 +++++++++++---
 .../common/CachingStellarProcessorTest.java     | 195 +++++++++++++------
 2 files changed, 252 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/c14c7869/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
index 36e6579..19de14e 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
@@ -19,11 +19,16 @@ package org.apache.metron.stellar.common;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.VariableResolver;
 import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -35,12 +40,38 @@ import java.util.concurrent.TimeUnit;
  * LFU cache.
  */
 public class CachingStellarProcessor extends StellarProcessor {
+
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static ThreadLocal<Map<String, Set<String>> > variableCache =
ThreadLocal.withInitial(() -> new HashMap<>());
+
+  /**
+   * A property that defines the maximum cache size.
+   */
   public static String MAX_CACHE_SIZE_PARAM = "stellar.cache.maxSize";
+
+  /**
+   * A property that defines the max time in minutes that elements are retained in the cache.
+   */
   public static String MAX_TIME_RETAIN_PARAM = "stellar.cache.maxTimeRetain";
 
+  /**
+   * A property that defines if cache usage stats should be recorded.
+   */
+  public static String RECORD_STATS = "stellar.cache.record.stats";
+
+  /**
+   * The cache key is based on the expression and input values.
+   */
   public static class Key {
+
+    /**
+     * The expression to execute.
+     */
     private String expression;
+
+    /**
+     * The variables that serve as input to the expression.
+     */
     private Map<String, Object> input;
 
     public Key(String expression, Map<String, Object> input) {
@@ -58,59 +89,98 @@ public class CachingStellarProcessor extends StellarProcessor {
 
     @Override
     public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      Key key = (Key) o;
+      if (this == o) {
+        return true;
+      }
 
-      if (getExpression() != null ? !getExpression().equals(key.getExpression()) : key.getExpression()
!= null)
+      if (o == null || getClass() != o.getClass()) {
         return false;
-      return getInput() != null ? getInput().equals(key.getInput()) : key.getInput() == null;
+      }
 
+      Key key = (Key) o;
+      return new EqualsBuilder()
+              .append(expression, key.expression)
+              .append(input, key.input)
+              .isEquals();
     }
 
     @Override
     public int hashCode() {
-      int result = getExpression() != null ? getExpression().hashCode() : 0;
-      result = 31 * result + (getInput() != null ? getInput().hashCode() : 0);
-      return result;
+      return new HashCodeBuilder(17, 37)
+              .append(expression)
+              .append(input)
+              .toHashCode();
     }
-  }
 
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this)
+              .append("expression", expression)
+              .append("input", input)
+              .toString();
+    }
+  }
 
   /**
-   * Parses and evaluates the given Stellar expression, {@code expression}.  Results will
be taken from a cache if possible.
+   * Parses and evaluates the given Stellar expression, {@code expression}. Results will
be taken
+   * from a cache if possible.
    *
-   * @param expression             The Stellar expression to parse and evaluate.
-   * @param variableResolver The {@link VariableResolver} to determine values of variables
used in the Stellar expression, {@code expression}.
-   * @param functionResolver The {@link FunctionResolver} to determine values of functions
used in the Stellar expression, {@code expression}.
-   * @param context          The context used during validation.
+   * @param expression The Stellar expression to parse and evaluate.
+   * @param variableResolver The {@link VariableResolver} to determine values of variables
used in
+   *     the Stellar expression, {@code expression}.
+   * @param functionResolver The {@link FunctionResolver} to determine values of functions
used in
+   *     the Stellar expression, {@code expression}.
+   * @param context The context used during validation.
    * @return The value of the evaluated Stellar expression, {@code expression}.
    */
   @Override
-  public Object parse(String expression, VariableResolver variableResolver, FunctionResolver
functionResolver, Context context) {
+  public Object parse(
+      String expression,
+      VariableResolver variableResolver,
+      FunctionResolver functionResolver,
+      Context context) {
+
     Optional<Object> cacheOpt = context.getCapability(Context.Capabilities.CACHE, false);
     if(cacheOpt.isPresent()) {
+
+      // use the cache
       Cache<Key, Object> cache = (Cache<Key, Object>) cacheOpt.get();
       Key k = toKey(expression, variableResolver);
       return cache.get(k, x -> parseUncached(x.expression, variableResolver, functionResolver,
context));
-    }
-    else {
+
+    } else {
+
+      LOG.debug("No cache present.");
       return parseUncached(expression, variableResolver, functionResolver, context);
     }
   }
 
   protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver
functionResolver, Context context) {
+    LOG.debug("Executing Stellar; expression={}", expression);
     return super.parse(expression, variableResolver, functionResolver, context);
   }
 
-  private Key toKey(String expression, VariableResolver resolver) {
+  /**
+   * Create a cache key using the expression and all variables used by that expression.
+   *
+   * @param expression The Stellar expression.
+   * @param resolver The variable resolver.
+   * @return A key with which to do a cache lookup.
+   */
+  protected Key toKey(String expression, VariableResolver resolver) {
+
+    // fetch only the variables used in the expression
     Set<String> variablesUsed = variableCache.get().computeIfAbsent(expression, this::variablesUsed);
+
+    // resolve each of the variables used by the expression
     Map<String, Object> input = new HashMap<>();
     for(String v : variablesUsed) {
       input.computeIfAbsent(v, resolver::resolve);
     }
-    return new Key(expression, input);
+
+    Key cacheKey = new Key(expression, input);
+    LOG.debug("Created cache key; {}", cacheKey);
+    return cacheKey;
   }
 
   /**
@@ -119,18 +189,39 @@ public class CachingStellarProcessor extends StellarProcessor {
    * @return A cache.
    */
   public static Cache<Key, Object> createCache(Map<String, Object> config) {
+
+    // the cache configuration is required
     if(config == null) {
+      LOG.debug("Cannot create cache; missing cache configuration");
       return null;
     }
+
+    // max cache size is required
     Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, null, Long.class);
+    if(maxSize == null || maxSize <= 0) {
+      LOG.error("Cannot create cache; missing or invalid configuration; {} = {}", MAX_CACHE_SIZE_PARAM,
maxSize);
+      return null;
+    }
+
+    // max time retain is required
     Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, null, Integer.class);
-    if(maxSize == null || maxTimeRetain == null || maxSize <= 0 || maxTimeRetain <=
0) {
+    if(maxTimeRetain == null || maxTimeRetain <= 0) {
+      LOG.error("Cannot create cache; missing or invalid configuration; {} = {}", MAX_TIME_RETAIN_PARAM,
maxTimeRetain);
       return null;
     }
-    return Caffeine.newBuilder()
-                   .maximumSize(maxSize)
-                   .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
-                   .build();
+
+    Caffeine<Object, Object> cache = Caffeine
+            .newBuilder()
+            .maximumSize(maxSize)
+            .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES);
+
+    // record stats is optional
+    Boolean recordStats = getParam(config, RECORD_STATS, false, Boolean.class);
+    if(recordStats) {
+      cache.recordStats();
+    }
+
+    return cache.build();
   }
 
   private static <T> T getParam(Map<String, Object> config, String key, T defaultVal,
Class<T> clazz) {

http://git-wip-us.apache.org/repos/asf/metron/blob/c14c7869/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
index 94421de..1690236 100644
--- a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
+++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
@@ -22,83 +22,160 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
 import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.apache.metron.stellar.dsl.VariableResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class CachingStellarProcessorTest {
 
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static Map<String, Object> fields = new HashMap<String, Object>() {{
       put("name", "blah");
     }};
 
+  private CachingStellarProcessor processor;
+  private Cache<CachingStellarProcessor.Key, Object> cache;
+  private Context contextWithCache;
+
+  @Before
+  public void setup() throws Exception {
+
+    // create the cache
+    Map<String, Object> cacheConfig = ImmutableMap.of(
+            CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 2,
+            CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10,
+            CachingStellarProcessor.RECORD_STATS, true
+    );
+    cache = CachingStellarProcessor.createCache(cacheConfig);
+    contextWithCache = new Context.Builder()
+            .with(Context.Capabilities.CACHE, () -> cache)
+            .build();
+
+    // create the object to test
+    processor = new CachingStellarProcessor();
+  }
+
+  /**
+   * Running the same expression multiple times should hit the cache.
+   */
+  @Test
+  public void testWithCache() {
+
+    Object result = execute("TO_UPPER(name)", contextWithCache);
+    assertEquals("BLAH", result);
+    assertEquals(1, cache.stats().requestCount());
+    assertEquals(1, cache.stats().missCount());
+    assertEquals(0, cache.stats().hitCount());
+
+    result = execute("TO_UPPER(name)", contextWithCache);
+    assertEquals("BLAH", result);
+    assertEquals(2, cache.stats().requestCount());
+    assertEquals(1, cache.stats().missCount());
+    assertEquals(1, cache.stats().hitCount());
+
+    result = execute("TO_UPPER(name)", contextWithCache);
+    assertEquals("BLAH", result);
+    assertEquals(3, cache.stats().requestCount());
+    assertEquals(1, cache.stats().missCount());
+    assertEquals(2, cache.stats().hitCount());
+  }
+
+  /**
+   * The processor should work, even if no cache is present in the execution context.
+   */
+  @Test
+  public void testNoCache() throws Exception {
+
+    // the execution context does not contain a cache
+    Context contextNoCache = Context.EMPTY_CONTEXT();
+
+    assertEquals("BLAH", execute("TO_UPPER(name)", contextNoCache));
+    assertEquals("BLAH", execute("TO_UPPER(name)", contextNoCache));
+  }
+
+  @Test
+  public void testInvalidMaxCacheSize() {
+    Map<String, Object> cacheConfig = ImmutableMap.of(
+            CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, -1,
+            CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10
+    );
+    cache = CachingStellarProcessor.createCache(cacheConfig);
+    assertNull(cache);
+  }
+
   @Test
-  public void testNoCaching() throws Exception {
-    //no caching, so every expression is a cache miss.
-    Assert.assertEquals(2, countMisses(2, Context.EMPTY_CONTEXT(), "TO_UPPER(name)"));
-    //Ensure the correct result is returned.
-    Assert.assertEquals("BLAH", evaluateExpression(Context.EMPTY_CONTEXT(), "TO_UPPER(name)"));
+  public void testMissingMaxCacheSize() {
+    Map<String, Object> cacheConfig = ImmutableMap.of(
+            CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10
+    );
+    cache = CachingStellarProcessor.createCache(cacheConfig);
+    assertNull(cache);
   }
 
   @Test
-  public void testCaching() throws Exception {
-    Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(
-                                                 ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM,
2
-                                                                ,CachingStellarProcessor.MAX_TIME_RETAIN_PARAM,
10
-                                                                )
-                                                                           );
-    Context context = new Context.Builder()
-                                 .with( Context.Capabilities.CACHE , () -> cache )
-                                 .build();
-    //running the same expression twice should hit the cache on the 2nd time and only yield
one miss
-    Assert.assertEquals(1, countMisses(2, context, "TO_UPPER(name)"));
-
-    //Ensure the correct result is returned.
-    Assert.assertEquals("BLAH", evaluateExpression(context, "TO_UPPER(name)"));
-
-    //running the same expression 20 more times should pull from the cache
-    Assert.assertEquals(0, countMisses(20, context, "TO_UPPER(name)"));
-
-    //Now we are running 4 distinct operations with a cache size of 2.  The cache has 1 element
in it before we start:
-    //  TO_LOWER(name) - miss (brand new), cache is full
-    //  TO_UPPER(name) - hit, cache is full
-    //  TO_UPPER('foo') - miss (brand new), cache is still full, but TO_LOWER is evicted
as the least frequently used
-    //  JOIN... - miss (brand new), cache is still full, but TO_UPPER('foo') is evicted as
the least frequently used
-    //this pattern repeats a 2nd time to add another 3 cache misses, totalling 6.
-    Assert.assertEquals(6, countMisses(2, context, "TO_LOWER(name)", "TO_UPPER(name)", "TO_UPPER('foo')",
"JOIN([name, 'blah'], ',')"));
+  public void testInvalidMaxTimeRetain() {
+    Map<String, Object> cacheConfig = ImmutableMap.of(
+            CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 10,
+            CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, -2
+    );
+    cache = CachingStellarProcessor.createCache(cacheConfig);
+    assertNull(cache);
   }
 
-  private Object evaluateExpression(Context context, String expression) {
-    StellarProcessor processor = new CachingStellarProcessor();
-    return processor.parse(expression
-                , new MapVariableResolver(fields)
-                , StellarFunctions.FUNCTION_RESOLVER()
-                , context);
+  @Test
+  public void testMissingMaxTimeRetain() {
+    Map<String, Object> cacheConfig = ImmutableMap.of(
+            CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 10
+    );
+    cache = CachingStellarProcessor.createCache(cacheConfig);
+    assertNull(cache);
   }
 
-  private int countMisses(int numRepetition, Context context, String... expressions) {
-    AtomicInteger numExpressions = new AtomicInteger(0);
-    StellarProcessor processor = new CachingStellarProcessor() {
-      @Override
-      protected Object parseUncached(String expression, VariableResolver variableResolver,
FunctionResolver functionResolver, Context context) {
-        numExpressions.incrementAndGet();
-        return super.parseUncached(expression, variableResolver, functionResolver, context);
-      }
-    };
-
-    for(int i = 0;i < numRepetition;++i) {
-      for(String expression : expressions) {
-        processor.parse(expression
-                , new MapVariableResolver(fields)
-                , StellarFunctions.FUNCTION_RESOLVER()
-                , context);
-      }
-    }
-    return numExpressions.get();
+  /**
+   * The cache should continue to hit, even if variables not used in the cached expression
change.
+   */
+  @Test
+  public void testUnrelatedVariableChange() {
+
+    // expect miss
+    Object result = execute("TO_UPPER(name)", contextWithCache);
+    assertEquals("BLAH", result);
+    assertEquals(1, cache.stats().requestCount());
+    assertEquals(1, cache.stats().missCount());
+    assertEquals(0, cache.stats().hitCount());
+
+    // add an irrelevant variable that is not used in the expression
+    fields.put("unrelated_var_1", "true");
+    fields.put("unrelated_var_2", 22);
+
+    // still expect a hit
+    result = execute("TO_UPPER(name)", contextWithCache);
+    assertEquals("BLAH", result);
+    assertEquals(2, cache.stats().requestCount());
+    assertEquals(1, cache.stats().missCount());
+    assertEquals(1, cache.stats().hitCount());
+
+  }
+
+  /**
+   * Execute each expression.
+   * @param expression The expression to execute.
+   */
+  private Object execute(String expression, Context context) {
+
+    Object result = processor.parse(
+            expression,
+            new MapVariableResolver(fields),
+            StellarFunctions.FUNCTION_RESOLVER(),
+            context);
+    return result;
   }
 }


Mime
View raw message