metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject metron git commit: METRON-1520: Add caching for stellar field transformations closes apache/incubator-metron#990
Date Wed, 25 Apr 2018 15:48:56 GMT
Repository: metron
Updated Branches:
  refs/heads/master 37e3fd32c -> 1c5435ccb


METRON-1520: Add caching for stellar field transformations closes apache/incubator-metron#990


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1c5435cc
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1c5435cc
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1c5435cc

Branch: refs/heads/master
Commit: 1c5435ccbe96c03e59c6a18d681da43561769dba
Parents: 37e3fd3
Author: cstella <cestella@gmail.com>
Authored: Wed Apr 25 11:48:44 2018 -0400
Committer: cstella <cestella@gmail.com>
Committed: Wed Apr 25 11:48:44 2018 -0400

----------------------------------------------------------------------
 metron-platform/Performance-tuning-guide.md     |  13 ++
 .../configuration/SensorParserConfig.java       |  15 ++
 .../transformation/StellarTransformation.java   |   3 +-
 .../StellarTransformationTest.java              |  30 ++++
 metron-platform/metron-parsers/README.md        |  13 ++
 .../apache/metron/parsers/bolt/ParserBolt.java  |  15 +-
 .../stellar/common/CachingStellarProcessor.java | 144 +++++++++++++++++++
 .../org/apache/metron/stellar/dsl/Context.java  |  43 +++++-
 .../common/CachingStellarProcessorTest.java     | 104 ++++++++++++++
 9 files changed, 371 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/Performance-tuning-guide.md
----------------------------------------------------------------------
diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md
index e2d1ae2..c2d19d6 100644
--- a/metron-platform/Performance-tuning-guide.md
+++ b/metron-platform/Performance-tuning-guide.md
@@ -60,6 +60,19 @@ parallelism will leave you with idle consumers since Kafka limits the max
number
 important because Kafka has certain ordering guarantees for message delivery per partition
that would not be possible if more than
 one consumer in a given consumer group were able to read from that partition.
 
+## Sensor Topology Tuning Suggestions
+
+If you are using stellar field transformations in your sensors, by default, stellar expressions
+are not cached.  Sensors that use stellar field transformations by see a performance
+boost by turning on caching via setting the `cacheConfig`
+[property](metron-parsers#parser_configuration).
+This is beneficial if your transformations:
+
+* Are complex (e.g. `ENRICHMENT_GET` calls or other high latency calls)
+* All Yield the same results for the same inputs ( caching is either off or applied to all
transformations)
+  * If any of your transformations are non-deterministic, caching should not be used as it
will result in the likelihood of incorrect results being returned.
+
+
 ## Component Tuning Levers
 
 ### High Level Overview

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 2d0ccd8..d347481 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -45,11 +45,26 @@ public class SensorParserConfig implements Serializable {
   private Integer parserNumTasks = 1;
   private Integer errorWriterParallelism = 1;
   private Integer errorWriterNumTasks = 1;
+  private Map<String, Object> cacheConfig = new HashMap<>();
   private Map<String, Object> spoutConfig = new HashMap<>();
   private String securityProtocol = null;
   private Map<String, Object> stormConfig = new HashMap<>();
 
   /**
+   * Cache config for stellar field transformations.
+   * * stellar.cache.maxSize - The maximum number of elements in the cache.
+   * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the
cache (in minutes).
+   * @return
+   */
+  public Map<String, Object> getCacheConfig() {
+    return cacheConfig;
+  }
+
+  public void setCacheConfig(Map<String, Object> cacheConfig) {
+    this.cacheConfig = cacheConfig;
+  }
+
+  /**
    * Return the number of workers for the topology.  This property will be used for the parser
unless overridden on the CLI.
    * @return
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
index 2a22e21..bb7501d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.common.field.transformation;
 
+import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -40,7 +41,7 @@ public class StellarTransformation implements FieldTransformation {
     Set<String> outputs = new HashSet<>(outputField);
     MapVariableResolver resolver = new MapVariableResolver(ret, intermediateVariables, input);
     resolver.add(sensorConfig);
-    StellarProcessor processor = new StellarProcessor();
+    StellarProcessor processor = new CachingStellarProcessor();
     for(Map.Entry<String, Object> kv : fieldMappingConfig.entrySet()) {
       String oField = kv.getKey();
       Object transformObj = kv.getValue();

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
index 0a3cbb0..fc91844 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
@@ -18,19 +18,49 @@
 
 package org.apache.metron.common.field.transformation;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.FieldTransformer;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 
+@RunWith(Parameterized.class)
 public class StellarTransformationTest {
+  Context context;
+  public StellarTransformationTest(Cache<CachingStellarProcessor.Key, Object> cache)
{
+    if(cache == null) {
+      context = Context.EMPTY_CONTEXT();
+    }
+    else {
+      context = new Context.Builder().with(Context.Capabilities.CACHE, () -> cache).build();
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+            new Object[][] {
+                     { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM,
10)) }
+                   , { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM,
1)) }
+                   , { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM,
0)) }
+                   , { null }
+                           }
+                        );
+  }
+
   /**
    {
     "fieldTransformations" : [

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 6b9d62e..1d2d834 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -174,6 +174,19 @@ then it is assumed to be a regex and will match any topic matching the
pattern (
 * `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden
on the command line.
 * `securityProtocol` : The security protocol to use for reading from kafka (this is a string).
 This can be overridden on the command line and also specified in the spout config via the
`security.protocol` key.  If both are specified, then they are merged and the CLI will take
precedence.
 * `stormConfig` : The storm config to use (this is a map).  This can be overridden on the
command line.  If both are specified, they are merged with CLI properties taking precedence.
+* `cacheConfig` : Cache config for stellar field transformations.   This configures a least
frequently used cache.  This is a map with the following keys.  If not explicitly configured
(the default), then no cache will be used.
+  * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default is to
not use a cache.
+  * `stellar.cache.maxTimeRetain` - The maximum amount of time an element is kept in the
cache (in minutes). Default is to not use a cache.
+
+  Example of a cache config to contain at max `20000` stellar expressions for at most `20`
minutes.:
+```
+{
+  "cacheConfig" : {
+    "stellar.cache.maxSize" : 20000,
+    "stellar.cache.maxTimeRetain" : 20
+  }
+}
+```
 
 The `fieldTransformations` is a complex object which defines a
 transformation which can be done to a message.  This transformation can 

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index e996f14..dd59355 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -31,6 +31,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+
+import com.github.benmanes.caffeine.cache.Cache;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
@@ -45,6 +47,7 @@ import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.storm.task.OutputCollector;
@@ -67,6 +70,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
   private WriterHandler writer;
   private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
+  private transient Cache<CachingStellarProcessor.Key, Object> cache;
   public ParserBolt( String zookeeperUrl
                    , String sensorType
                    , MessageParser<JSONObject> parser
@@ -94,6 +98,9 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
+    if(getSensorParserConfig() != null) {
+      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
+    }
     initializeStellar();
     if(getSensorParserConfig() != null && filter == null) {
       getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
@@ -119,11 +126,15 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable
{
   }
 
   protected void initializeStellar() {
-    this.stellarContext = new Context.Builder()
+    Context.Builder builder = new Context.Builder()
                                 .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
                                 .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
                                 .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                .build();
+                                ;
+    if(cache != null) {
+      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+    }
+    this.stellarContext = builder.build();
     StellarFunctions.initialize(stellarContext);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
new file mode 100644
index 0000000..36e6579
--- /dev/null
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.VariableResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The Caching Stellar Processor is a stellar processor that optionally fronts stellar with
an expression-by-expression
+ * LFU cache.
+ */
+public class CachingStellarProcessor extends StellarProcessor {
+  private static ThreadLocal<Map<String, Set<String>> > variableCache =
ThreadLocal.withInitial(() -> new HashMap<>());
+  public static String MAX_CACHE_SIZE_PARAM = "stellar.cache.maxSize";
+  public static String MAX_TIME_RETAIN_PARAM = "stellar.cache.maxTimeRetain";
+
+  public static class Key {
+    private String expression;
+    private Map<String, Object> input;
+
+    public Key(String expression, Map<String, Object> input) {
+      this.expression = expression;
+      this.input = input;
+    }
+
+    public String getExpression() {
+      return expression;
+    }
+
+    public Map<String, Object> getInput() {
+      return input;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Key key = (Key) o;
+
+      if (getExpression() != null ? !getExpression().equals(key.getExpression()) : key.getExpression()
!= null)
+        return false;
+      return getInput() != null ? getInput().equals(key.getInput()) : key.getInput() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = getExpression() != null ? getExpression().hashCode() : 0;
+      result = 31 * result + (getInput() != null ? getInput().hashCode() : 0);
+      return result;
+    }
+  }
+
+
+  /**
+   * Parses and evaluates the given Stellar expression, {@code expression}.  Results will
be taken from a cache if possible.
+   *
+   * @param expression             The Stellar expression to parse and evaluate.
+   * @param variableResolver The {@link VariableResolver} to determine values of variables
used in the Stellar expression, {@code expression}.
+   * @param functionResolver The {@link FunctionResolver} to determine values of functions
used in the Stellar expression, {@code expression}.
+   * @param context          The context used during validation.
+   * @return The value of the evaluated Stellar expression, {@code expression}.
+   */
+  @Override
+  public Object parse(String expression, VariableResolver variableResolver, FunctionResolver
functionResolver, Context context) {
+    Optional<Object> cacheOpt = context.getCapability(Context.Capabilities.CACHE, false);
+    if(cacheOpt.isPresent()) {
+      Cache<Key, Object> cache = (Cache<Key, Object>) cacheOpt.get();
+      Key k = toKey(expression, variableResolver);
+      return cache.get(k, x -> parseUncached(x.expression, variableResolver, functionResolver,
context));
+    }
+    else {
+      return parseUncached(expression, variableResolver, functionResolver, context);
+    }
+  }
+
+  protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver
functionResolver, Context context) {
+    return super.parse(expression, variableResolver, functionResolver, context);
+  }
+
+  private Key toKey(String expression, VariableResolver resolver) {
+    Set<String> variablesUsed = variableCache.get().computeIfAbsent(expression, this::variablesUsed);
+    Map<String, Object> input = new HashMap<>();
+    for(String v : variablesUsed) {
+      input.computeIfAbsent(v, resolver::resolve);
+    }
+    return new Key(expression, input);
+  }
+
+  /**
+   * Create a cache given a config.  Note that if the cache size is <= 0, then no cache
will be returned.
+   * @param config
+   * @return A cache.
+   */
+  public static Cache<Key, Object> createCache(Map<String, Object> config) {
+    if(config == null) {
+      return null;
+    }
+    Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, null, Long.class);
+    Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, null, Integer.class);
+    if(maxSize == null || maxTimeRetain == null || maxSize <= 0 || maxTimeRetain <=
0) {
+      return null;
+    }
+    return Caffeine.newBuilder()
+                   .maximumSize(maxSize)
+                   .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+                   .build();
+  }
+
+  private static <T> T getParam(Map<String, Object> config, String key, T defaultVal,
Class<T> clazz) {
+    Object o = config.get(key);
+    if(o == null) {
+      return defaultVal;
+    }
+    T ret = ConversionUtils.convert(o, clazz);
+    return ret == null?defaultVal:ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
index 9568a05..8a477c4 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
@@ -30,12 +30,43 @@ public class Context implements Serializable {
   
   public enum Capabilities {
       HBASE_PROVIDER
-    , GLOBAL_CONFIG
-    , ZOOKEEPER_CLIENT
-    , SERVICE_DISCOVERER
-    , STELLAR_CONFIG
-    , CONSOLE
-    , SHELL_VARIABLES
+    ,
+    /**
+     * This capability indicates that the global config is available.
+     */
+    GLOBAL_CONFIG
+    ,
+    /**
+     * This capability indicates that a zookeeper client (i.e. a Curator client, specifically)
is available.
+     */
+    ZOOKEEPER_CLIENT
+    ,
+    /**
+     * This capability indicates that a MaaS service discoverer is available.
+     */
+    SERVICE_DISCOVERER
+    ,
+    /**
+     * This capability indicates that a map configuring stellar is available.  Generally
this is done within the global config
+     * inside of storm, but may be sourced elsewhere (e.g. the CLI when running the REPL).
+     */
+    STELLAR_CONFIG
+    ,
+    /**
+     * This capability indicates that the Console object is available.  This is available
when run via the CLI (e.g. from the REPL).
+     */
+    CONSOLE
+    ,
+    /**
+     * This capability indicates that shell variables are available.  This is available when
run via the CLI (e.g. from the REPL).
+     */
+    SHELL_VARIABLES
+    ,
+    /**
+     * This capability indicates that the StellarProcessor should use a Caffeine cache to
cache expression -> results.  If an expression
+     * is in the cache, then the cached result will be returned instead of recomputing.
+     */
+    CACHE
   }
 
   public enum ActivityType {

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
new file mode 100644
index 0000000..94421de
--- /dev/null
+++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.stellar.dsl.VariableResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CachingStellarProcessorTest {
+
+  private static Map<String, Object> fields = new HashMap<String, Object>() {{
+      put("name", "blah");
+    }};
+
+  @Test
+  public void testNoCaching() throws Exception {
+    //no caching, so every expression is a cache miss.
+    Assert.assertEquals(2, countMisses(2, Context.EMPTY_CONTEXT(), "TO_UPPER(name)"));
+    //Ensure the correct result is returned.
+    Assert.assertEquals("BLAH", evaluateExpression(Context.EMPTY_CONTEXT(), "TO_UPPER(name)"));
+  }
+
+  @Test
+  public void testCaching() throws Exception {
+    Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(
+                                                 ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM,
2
+                                                                ,CachingStellarProcessor.MAX_TIME_RETAIN_PARAM,
10
+                                                                )
+                                                                           );
+    Context context = new Context.Builder()
+                                 .with( Context.Capabilities.CACHE , () -> cache )
+                                 .build();
+    //running the same expression twice should hit the cache on the 2nd time and only yield
one miss
+    Assert.assertEquals(1, countMisses(2, context, "TO_UPPER(name)"));
+
+    //Ensure the correct result is returned.
+    Assert.assertEquals("BLAH", evaluateExpression(context, "TO_UPPER(name)"));
+
+    //running the same expression 20 more times should pull from the cache
+    Assert.assertEquals(0, countMisses(20, context, "TO_UPPER(name)"));
+
+    //Now we are running 4 distinct operations with a cache size of 2.  The cache has 1 element
in it before we start:
+    //  TO_LOWER(name) - miss (brand new), cache is full
+    //  TO_UPPER(name) - hit, cache is full
+    //  TO_UPPER('foo') - miss (brand new), cache is still full, but TO_LOWER is evicted
as the least frequently used
+    //  JOIN... - miss (brand new), cache is still full, but TO_UPPER('foo') is evicted as
the least frequently used
+    //this pattern repeats a 2nd time to add another 3 cache misses, totalling 6.
+    Assert.assertEquals(6, countMisses(2, context, "TO_LOWER(name)", "TO_UPPER(name)", "TO_UPPER('foo')",
"JOIN([name, 'blah'], ',')"));
+  }
+
+  private Object evaluateExpression(Context context, String expression) {
+    StellarProcessor processor = new CachingStellarProcessor();
+    return processor.parse(expression
+                , new MapVariableResolver(fields)
+                , StellarFunctions.FUNCTION_RESOLVER()
+                , context);
+  }
+
+  private int countMisses(int numRepetition, Context context, String... expressions) {
+    AtomicInteger numExpressions = new AtomicInteger(0);
+    StellarProcessor processor = new CachingStellarProcessor() {
+      @Override
+      protected Object parseUncached(String expression, VariableResolver variableResolver,
FunctionResolver functionResolver, Context context) {
+        numExpressions.incrementAndGet();
+        return super.parseUncached(expression, variableResolver, functionResolver, context);
+      }
+    };
+
+    for(int i = 0;i < numRepetition;++i) {
+      for(String expression : expressions) {
+        processor.parse(expression
+                , new MapVariableResolver(fields)
+                , StellarFunctions.FUNCTION_RESOLVER()
+                , context);
+      }
+    }
+    return numExpressions.get();
+  }
+}


Mime
View raw message