metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-706: Add Stellar transformations and filters to enrichment and threat intel loaders (mmiklavc via cestella) closes apache/incubator-metron#445
Date Thu, 09 Feb 2017 13:58:30 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 0ae9c4b48 -> c5bbf5ace


METRON-706: Add Stellar transformations and filters to enrichment and threat intel loaders (mmiklavc via cestella) closes apache/incubator-metron#445


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/c5bbf5ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/c5bbf5ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/c5bbf5ac

Branch: refs/heads/master
Commit: c5bbf5acef05de16a18db9397745a6795427f6b8
Parents: 0ae9c4b
Author: mmiklavc <michael.miklavcic@gmail.com>
Authored: Thu Feb 9 08:58:08 2017 -0500
Committer: cstella <cestella@gmail.com>
Committed: Thu Feb 9 08:58:08 2017 -0500

----------------------------------------------------------------------
 .../stellar/network/NetworkFunctionsTest.java   |   1 +
 .../common/utils/ConversionUtilsTest.java       |   2 +
 .../metron-data-management/README.md            |  81 ++++++-
 .../dataloads/extractor/ExtractorDecorator.java |  42 ++++
 .../dataloads/extractor/ExtractorHandler.java   | 102 ++++----
 .../metron/dataloads/extractor/Extractors.java  |   5 +-
 .../TransformFilterExtractorDecorator.java      | 235 +++++++++++++++++++
 .../dataloads/extractor/csv/CSVExtractor.java   |  16 +-
 .../metron/dataloads/hbase/mr/PrunerMapper.java |  10 +
 .../SimpleEnrichmentFlatFileLoader.java         |  12 +-
 .../extractor/ExtractorDecoratorTest.java       |  59 +++++
 .../TransformFilterExtractorDecoratorTest.java  | 189 +++++++++++++++
 .../extractor/csv/CSVExtractorTest.java         |   6 +-
 ...EnrichmentFlatFileLoaderIntegrationTest.java | 161 ++++++++++++-
 .../enrichment/converter/EnrichmentKey.java     |  10 +
 .../metron/enrichment/lookup/LookupKey.java     |   6 +-
 .../metron/enrichment/lookup/LookupValue.java   |   2 +-
 17 files changed, 865 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/network/NetworkFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/network/NetworkFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/network/NetworkFunctionsTest.java
index 783658c..d43d6fd 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/network/NetworkFunctionsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/network/NetworkFunctionsTest.java
@@ -82,6 +82,7 @@ public class NetworkFunctionsTest {
 
   @Test
   public void removeTldTest() {
+    runWithArguments("DOMAIN_REMOVE_TLD", "google.com", "google");
     runWithArguments("DOMAIN_REMOVE_TLD", "www.google.co.uk", "www.google");
     runWithArguments("DOMAIN_REMOVE_TLD", "www.google.com", "www.google");
     runWithArguments("DOMAIN_REMOVE_TLD", "com", "");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
index 90d00e4..7c825a1 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ConversionUtilsTest.java
@@ -22,6 +22,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 public class ConversionUtilsTest {
+
   @Test
   public void testIntegerConversions() {
     Object o = 1;
@@ -29,4 +30,5 @@ public class ConversionUtilsTest {
     Assert.assertEquals(Integer.valueOf(1), ConversionUtils.convert("1", Integer.class));
     Assert.assertNull(ConversionUtils.convert("foo", Integer.class));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index 60c0283..42e4b91 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -89,7 +89,7 @@ for the value will be 'meta'.  For instance, given an input string of `123.45.12
 would be extracted:
 * Indicator : `123.45.123.12`
 * Type : `malicious_ip`
-* Value : `{ "source" : "the grapevine" }`
+* Value : `{ "ip" : "123.45.123.12", "source" : "the grapevine" }`
 
 ### STIX Extractor
 
@@ -127,6 +127,85 @@ addresses from the set of all possible addresses.  Note that if no categories ar
 Also, only address and domain types allow filtering via `stix_address_categories` and `stix_domain_categories` config
 parameters.
 
+### Common Extractor Properties
+
+Users also have the ability to transform and filter enrichment and threat intel data using Stellar as it is loaded into HBase. This feature is available to all extractor types.
+
+As an example, we will be providing a CSV list of top domains as an enrichment and filtering the value metadata, as well as the indicator column, with Stellar expressions.
+
+````
+{
+  "config" : {
+    "zk_quorum" : "node1:2181",
+    "columns" : {
+       "rank" : 0,
+       "domain" : 1
+    },
+    "value_transform" : {
+       "domain" : "DOMAIN_REMOVE_TLD(domain)"
+    },
+    "value_filter" : "LENGTH(domain) > 0",
+    "indicator_column" : "domain",
+    "indicator_transform" : {
+       "indicator" : "DOMAIN_REMOVE_TLD(indicator)"
+    },
+    "indicator_filter" : "LENGTH(indicator) > 0",
+    "type" : "top_domains",
+    "separator" : ","
+  },
+  "extractor" : "CSV"
+}
+````
+
+There are 2 property maps that work with full Stellar expressions, and 2 properties that will work with Stellar predicates.
+
+| Property            | Description |
+|---------------------|-------------|
+| value_transform     | Transform fields defined in the "columns" mapping with Stellar transformations. New keys introduced in the transform will be added to the key metadata. |
+| value_filter        | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose domain property is empty after removing the TLD will be omitted. |
+| indicator_transform | Transform the indicator column independent of the value transformations. You can refer to the original indicator value by using "indicator" as the variable name, as shown in the example above. In addition, if you prefer to piggyback your transformations, you can refer to the variable "domain", which will allow your indicator transforms to inherit transformations done to this value during the value transformations. |
+| indicator_filter    | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose indicator value is empty after removing the TLD will be omitted. |
+
+top-list.csv
+````
+1,google.com
+2,youtube.com
+...
+````
+
+Running a file import with the above data and extractor configuration would result in the following 2 extracted data records:
+
+| Indicator | Type | Value |
+|-----------|------|-------|
+| google    | top_domains | { "rank" : "1", "domain" : "google" } |
+| yahoo     | top_domains | { "rank" : "2", "domain" : "yahoo" } |
+
+Similar to the parser framework, providing a Zookeeper quorum via the zk_quorum property will enable Stellar to access properties that reside in the global config.
+Expanding on our example above, if the global config looks as follows:
+````
+{
+    "global_property" : "metron-ftw"
+}
+````
+
+And we expand our value_tranform:
+````
+...
+    "value_transform" : {
+       "domain" : "DOMAIN_REMOVE_TLD(domain)",
+       "a-new-prop" : "global_property"
+    },
+...
+
+````
+
+The resulting value data would look like the following:
+
+| Indicator | Type | Value |
+|-----------|------|-------|
+| google    | top_domains | { "rank" : "1", "domain" : "google", "a-new-prop" : "metron-ftw" } |
+| yahoo     | top_domains | { "rank" : "2", "domain" : "yahoo", "a-new-prop" : "metron-ftw" } |
+
 ## Enrichment Config
 
 In order to automatically add new enrichment and threat intel types to existing, running enrichment topologies, you will

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorDecorator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorDecorator.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorDecorator.java
new file mode 100644
index 0000000..bf42760
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorDecorator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class ExtractorDecorator implements Extractor {
+
+  protected final Extractor decoratedExtractor;
+
+  public ExtractorDecorator(Extractor decoratedExtractor) {
+    this.decoratedExtractor = decoratedExtractor;
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(String line) throws IOException {
+    return decoratedExtractor.extract(line);
+  }
+
+  @Override
+  public void initialize(Map<String, Object> config) {
+    decoratedExtractor.initialize(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
index 2e2f799..a9df2fd 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -26,55 +26,77 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
 public class ExtractorHandler {
-    final static ObjectMapper _mapper = new ObjectMapper();
-    private Map<String, Object> config;
-    private Extractor extractor;
-    private InputFormatHandler inputFormat = Formats.BY_LINE;
+  final static ObjectMapper _mapper = new ObjectMapper();
+  private Map<String, Object> config;
+  private Extractor extractor;
+  private InputFormatHandler inputFormat = Formats.BY_LINE;
 
-    public Map<String, Object> getConfig() {
-        return config;
-    }
+  public Map<String, Object> getConfig() {
+    return config;
+  }
 
-    public void setConfig(Map<String, Object> config) {
-        this.config = config;
-    }
+  /**
+   * Set by jackson. Extractor configuration from JSON
+   */
+  public void setConfig(Map<String, Object> config) {
+    this.config = config;
+  }
 
-    public InputFormatHandler getInputFormat() {
-        return inputFormat;
-    }
+  public InputFormatHandler getInputFormat() {
+    return inputFormat;
+  }
 
-    public void setInputFormat(String handler) {
-        try {
-            this.inputFormat= Formats.create(handler);
-        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
-            throw new IllegalStateException("Unable to create an inputformathandler", e);
-        }
+  /**
+   * Set by jackson
+   */
+  public void setInputFormat(String handler) {
+    try {
+      this.inputFormat = Formats.create(handler);
+    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
+      throw new IllegalStateException("Unable to create an inputformathandler", e);
     }
+  }
 
-    public Extractor getExtractor() {
-        return extractor;
-    }
-    public void setExtractor(String extractor) {
-        try {
-            this.extractor = Extractors.create(extractor);
-        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
-            throw new IllegalStateException("Unable to create an extractor", e);
-        }
-    }
+  public Extractor getExtractor() {
+    return extractor;
+  }
 
-    public static synchronized ExtractorHandler load(InputStream is) throws IOException {
-        ExtractorHandler ret = _mapper.readValue(is, ExtractorHandler.class);
-        ret.getExtractor().initialize(ret.getConfig());
-        return ret;
-    }
-    public static synchronized ExtractorHandler load(String s, Charset c) throws IOException {
-        return load( new ByteArrayInputStream(s.getBytes(c)));
-    }
-    public static synchronized ExtractorHandler load(String s) throws IOException {
-        return load( s, Charset.defaultCharset());
+  /**
+   * Set by jackson.
+   *
+   * @param extractor Name of extractor to instantiate
+   */
+  public void setExtractor(String extractor) {
+    try {
+      this.extractor = Extractors.create(extractor);
+    } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
+      throw new IllegalStateException("Unable to create an extractor", e);
     }
+  }
+
+  /**
+   * Load json configuration
+   */
+  public static synchronized ExtractorHandler load(InputStream is) throws IOException {
+    ExtractorHandler ret = _mapper.readValue(is, ExtractorHandler.class);
+    ret.getExtractor().initialize(ret.getConfig());
+    return ret;
+  }
+
+  /**
+   * Load json configuration
+   */
+  public static synchronized ExtractorHandler load(String s, Charset c) throws IOException {
+    return load(new ByteArrayInputStream(s.getBytes(c)));
+  }
+
+  /**
+   * Load json configuration
+   */
+  public static synchronized ExtractorHandler load(String s) throws IOException {
+    return load(s, Charset.defaultCharset());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
index 771a1e3..93438d3 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractors.java
@@ -21,7 +21,6 @@ import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
 import org.apache.metron.dataloads.extractor.stix.StixExtractor;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
 
 public enum Extractors implements ExtractorCreator {
     CSV(new ExtractorCreator() {
@@ -49,11 +48,11 @@ public enum Extractors implements ExtractorCreator {
     public static Extractor create(String extractorName) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
         try {
             ExtractorCreator ec = Extractors.valueOf(extractorName);
-            return ec.create();
+            return new TransformFilterExtractorDecorator(ec.create());
         }
         catch(IllegalArgumentException iae) {
             Extractor ex = (Extractor) Class.forName(extractorName).getConstructor().newInstance();
-            return ex;
+            return new TransformFilterExtractorDecorator(ex);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
new file mode 100644
index 0000000..a1448d9
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.MapVariableResolver;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarPredicateProcessor;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
+
+public class TransformFilterExtractorDecorator extends ExtractorDecorator {
+  private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
+
+  protected enum ExtractorOptions {
+    VALUE_TRANSFORM("value_transform"),
+    VALUE_FILTER("value_filter"),
+    INDICATOR_TRANSFORM("indicator_transform"),
+    INDICATOR_FILTER("indicator_filter"),
+    ZK_QUORUM("zk_quorum"),
+    INDICATOR("indicator");
+
+    private String key;
+
+    ExtractorOptions(String key) {
+      this.key = key;
+    }
+
+    @Override
+    public String toString() {
+      return key;
+    }
+
+    public boolean existsIn(Map<String, Object> config) {
+      return config.containsKey(key);
+    }
+  }
+
+  private Optional<CuratorFramework> zkClient;
+  private Map<String, String> valueTransforms;
+  private Map<String, String> indicatorTransforms;
+  private String valueFilter;
+  private String indicatorFilter;
+  private Context stellarContext;
+  private StellarProcessor transformProcessor;
+  private StellarPredicateProcessor filterProcessor;
+  private Map<String, Object> globalConfig;
+
+  public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
+    super(decoratedExtractor);
+    this.zkClient = Optional.empty();
+    this.valueTransforms = new LinkedHashMap<>();
+    this.indicatorTransforms = new LinkedHashMap<>();
+    this.valueFilter = "";
+    this.indicatorFilter = "";
+    this.transformProcessor = new StellarProcessor();
+    this.filterProcessor = new StellarPredicateProcessor();
+  }
+
+  @Override
+  public void initialize(Map<String, Object> config) {
+    super.initialize(config);
+    if (VALUE_TRANSFORM.existsIn(config)) {
+      this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
+    }
+    if (INDICATOR_TRANSFORM.existsIn(config)) {
+      this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
+    }
+    if (VALUE_FILTER.existsIn(config)) {
+      this.valueFilter = getFilter(config, VALUE_FILTER.toString());
+    }
+    if (INDICATOR_FILTER.existsIn(config)) {
+      this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
+    }
+    String zkClientUrl = "";
+    if (ZK_QUORUM.existsIn(config)) {
+      zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
+    }
+    zkClient = setupClient(zkClient, zkClientUrl);
+    this.globalConfig = getGlobalConfig(zkClient);
+    this.stellarContext = createContext(zkClient);
+    StellarFunctions.initialize(stellarContext);
+    this.transformProcessor = new StellarProcessor();
+    this.filterProcessor = new StellarPredicateProcessor();
+  }
+
+  private String getFilter(Map<String, Object> config, String valueFilter) {
+    return (String) config.get(valueFilter);
+  }
+
+  /**
+   * Get a map of the transformations from the config of the specified type
+   * @param config main config map
+   * @param type the transformation type to get from config
+   * @return map of transformations.
+   */
+  private Map<String, String> getTransforms(Map<String, Object> config, String type) {
+    // If this isn't a Map of Strings, let an exception be thrown
+    @SuppressWarnings("unchecked") Map<Object, Object> transformsConfig = (Map) config.get(type);
+    Map<String, String> transforms = new LinkedHashMap<>();
+    for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
+      transforms.put((String) e.getKey(), (String) e.getValue());
+    }
+    return transforms;
+  }
+
+  /**
+   * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
+   * @param zookeeperUrl The Zookeeper URL.
+   */
+  private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
+    // can only create client if we have a valid zookeeper URL
+    if (!zkClient.isPresent()) {
+      if (StringUtils.isNotBlank(zookeeperUrl)) {
+        CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+        client.start();
+        return Optional.of(client);
+      } else {
+        LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
+        return Optional.empty();
+      }
+    } else {
+      return zkClient;
+    }
+  }
+
+  private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
+    if (zkClient.isPresent()) {
+      try {
+        return JSONUtils.INSTANCE.load(
+                new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
+                new TypeReference<Map<String, Object>>() {
+                });
+      } catch (Exception e) {
+        LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
+      }
+    }
+    return new LinkedHashMap<>();
+  }
+
+  private Context createContext(Optional<CuratorFramework> zkClient) {
+    Context.Builder builder = new Context.Builder();
+    if (zkClient.isPresent()) {
+      builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get);
+    }
+    builder.with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
+    return builder.build();
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(String line) throws IOException {
+    List<LookupKV> lkvs = new ArrayList<>();
+    for (LookupKV lkv : super.extract(line)) {
+      if (updateLookupKV(lkv)) {
+        lkvs.add(lkv);
+      }
+    }
+    return lkvs;
+  }
+
+  /**
+   * Returns true if lookupkv is not null after transforms and filtering on the value and indicator key
+   * @param lkv LookupKV to transform and filter
+   * @return true if lkv is not null after transform/filter
+   */
+  private boolean updateLookupKV(LookupKV lkv) {
+    Map<String, Object> ret = lkv.getValue().getMetadata();
+    Map<String, Object> ind = new LinkedHashMap<>();
+    String indicator = lkv.getKey().getIndicator();
+    // add indicator as a resolvable variable. Also enable using resolved/transformed variables and values from operating on the value metadata
+    ind.put(INDICATOR.toString(), indicator);
+    MapVariableResolver resolver = new MapVariableResolver(ret, ind, globalConfig);
+    transform(valueTransforms, ret, resolver);
+    transform(indicatorTransforms, ind, resolver);
+    // update indicator
+    Object updatedIndicator = ind.get(INDICATOR.toString());
+    if (updatedIndicator != null) {
+      if (!(updatedIndicator instanceof String)) {
+        throw new UnsupportedOperationException("Indicator transform must return String type");
+      }
+      lkv.getKey().setIndicator((String) updatedIndicator);
+      return filter(indicatorFilter, resolver) && filter(valueFilter, resolver);
+    } else {
+      return false;
+    }
+  }
+
+  private void transform(Map<String, String> transforms, Map<String, Object> variableMap, MapVariableResolver variableResolver) {
+    for (Map.Entry<String, String> entry : transforms.entrySet()) {
+      Object o = transformProcessor.parse(entry.getValue(), variableResolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
+      if (o == null) {
+        variableMap.remove(entry.getKey());
+      } else {
+        variableMap.put(entry.getKey(), o);
+      }
+    }
+  }
+
+  private Boolean filter(String filterPredicate, MapVariableResolver variableResolver) {
+    return filterProcessor.parse(filterPredicate, variableResolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
+  }
+
+  protected void setZkClient(Optional<CuratorFramework> zkClient) {
+    this.zkClient = zkClient;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
index 502b46a..005225e 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/csv/CSVExtractor.java
@@ -32,14 +32,14 @@ public class CSVExtractor extends CSVConverter implements Extractor {
   public static final String TYPE_KEY="type";
   public static final String LOOKUP_CONVERTER = "lookup_converter";
 
-  private int typeColumn;
+  private int typeColumnIndex;
   private String type;
   private int indicatorColumn;
 
   private LookupConverter converter = LookupConverters.ENRICHMENT.getConverter();
 
-  public int getTypeColumn() {
-    return typeColumn;
+  public int getTypeColumnIndex() {
+    return typeColumnIndex;
   }
 
   public String getType() {
@@ -50,10 +50,10 @@ public class CSVExtractor extends CSVConverter implements Extractor {
     return indicatorColumn;
   }
 
-
   public LookupConverter getConverter() {
     return converter;
   }
+
   @Override
   public Iterable<LookupKV> extract(String line) throws IOException {
     if(ignore(line)) {
@@ -69,19 +69,15 @@ public class CSVExtractor extends CSVConverter implements Extractor {
     return Arrays.asList(new LookupKV(key, converter.toValue(values)));
   }
 
-
-
   private String getType(String[] tokens) {
     if(type == null) {
-      return tokens[typeColumn];
+      return tokens[typeColumnIndex];
     }
     else {
       return type;
     }
   }
 
-
-
   @Override
   public void initialize(Map<String, Object> config) {
     super.initialize(config);
@@ -93,7 +89,7 @@ public class CSVExtractor extends CSVConverter implements Extractor {
       type = config.get(TYPE_KEY).toString();
     }
     else if(config.containsKey(TYPE_COLUMN_KEY)) {
-      typeColumn = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
+      typeColumnIndex = columnMap.get(config.get(TYPE_COLUMN_KEY).toString());
     }
     if(config.containsKey(LOOKUP_CONVERTER)) {
       converter = LookupConverters.getConverter((String) config.get(LOOKUP_CONVERTER));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
index d0f1e46..82b4d3a 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/hbase/mr/PrunerMapper.java
@@ -72,6 +72,16 @@ public class PrunerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
             public void fromBytes(byte[] in) {
 
             }
+
+            @Override
+            public String getIndicator() {
+                return null;
+            }
+
+            @Override
+            public void setIndicator(String indicator) {
+
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
index 8ee11aa..2635426 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -17,19 +17,21 @@
  */
 package org.apache.metron.dataloads.nonbulk.flatfile;
 
-import org.apache.commons.cli.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentUpdateConfig;
-import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.util.EnumMap;
+import java.util.Optional;
 
 public class SimpleEnrichmentFlatFileLoader {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorDecoratorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorDecoratorTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorDecoratorTest.java
new file mode 100644
index 0000000..93c8098
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/ExtractorDecoratorTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.verify;
+
+public class ExtractorDecoratorTest {
+
+  @Mock
+  Extractor extractor;
+
+  @Before
+  public void before() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void sets_member_variables() {
+    ExtractorDecorator decorator = new ExtractorDecorator(extractor);
+    Assert.assertThat(decorator.decoratedExtractor, notNullValue());
+  }
+
+  @Test
+  public void calls_extractor_methods() throws IOException {
+    ExtractorDecorator decorator = new ExtractorDecorator(extractor);
+    decorator.initialize(new HashMap());
+    decorator.extract("line");
+    verify(extractor).initialize(isA(Map.class));
+    verify(extractor).extract("line");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecoratorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecoratorTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecoratorTest.java
new file mode 100644
index 0000000..61443c2
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecoratorTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransformFilterExtractorDecoratorTest {
+
+  @Mock
+  CuratorFramework zkClient;
+  @Mock
+  Extractor extractor;
+  LinkedHashMap<String, Object> config1;
+  TransformFilterExtractorDecorator decorator;
+
+  @Before
+  public void setup() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    config1 = new ObjectMapper().readValue(config1Contents, LinkedHashMap.class);
+    decorator = new TransformFilterExtractorDecorator(extractor);
+    decorator.setZkClient(Optional.of(zkClient));
+    decorator.initialize(config1);
+  }
+
+  /**
+   *{
+   *  "zk_quorum" : "blah",
+   *  "columns" : {
+   *    "foo" : 0,
+   *    "bar" : 1,
+   *    "baz" : 2
+   *  },
+   *  "value_transform" : {
+   *    "foo" : "TO_UPPER(foo)",
+   *    "newvar" : "foo",
+   *    "lowernewvar" : "TO_LOWER(newvar)"
+   *  },
+   *  "value_filter" : "LENGTH(baz) > 0",
+   *  "indicator_column" : "bar",
+   *  "indicator_transform" : {
+   *    "somevar" : "indicator",
+   *    "indicator" : "TO_UPPER(somevar)"
+   *  },
+   *  "indicator_filter" : "LENGTH(indicator) > 0",
+   *  "type" : "testenrichment",
+   *  "separator" : ","
+   *}
+   */
+  @Multiline
+  public static String config1Contents;
+
+  @Test
+  public void transforms_values_and_indicators() throws IOException {
+    final String indicatorVal = "val2";
+    EnrichmentKey lookupKey = new EnrichmentKey("testenrichment", indicatorVal);
+    EnrichmentValue lookupValue = new EnrichmentValue(new HashMap<String, Object>() {{
+      put("foo", "val1");
+      put("bar", indicatorVal);
+      put("baz", "val3");
+    }});
+    LookupKV lkv = new LookupKV<>(lookupKey, lookupValue);
+    List<LookupKV> extractedLkvs = new ArrayList<>();
+    extractedLkvs.add(lkv);
+    Mockito.when(extractor.extract("val1,val2,val3")).thenReturn(extractedLkvs);
+    Iterable<LookupKV> extracted = decorator.extract("val1,val2,val3");
+
+    EnrichmentKey expectedLookupKey = new EnrichmentKey("testenrichment", "VAL2");
+    EnrichmentValue expectedLookupValue = new EnrichmentValue(new HashMap<String, Object>() {{
+      put("foo", "VAL1");
+      put("bar", "val2");
+      put("baz", "val3");
+      put("newvar", "VAL1");
+      put("lowernewvar", "val1");
+    }});
+    LookupKV expectedLkv = new LookupKV<>(expectedLookupKey, expectedLookupValue);
+    List<LookupKV> expectedLkvs = new ArrayList<>();
+    expectedLkvs.add(expectedLkv);
+    Assert.assertThat(extracted, CoreMatchers.equalTo(expectedLkvs));
+  }
+
+  @Test
+  public void filters_values() throws Exception {
+    final String indicatorVal = "val2";
+    EnrichmentKey lookupKey = new EnrichmentKey("testenrichment", indicatorVal);
+    EnrichmentValue lookupValue = new EnrichmentValue(new HashMap<String, Object>() {{
+      put("foo", "val1");
+      put("bar", indicatorVal);
+      put("baz", "");
+    }});
+    LookupKV lkv = new LookupKV<>(lookupKey, lookupValue);
+    List<LookupKV> extractedLkvs = new ArrayList<>();
+    extractedLkvs.add(lkv);
+    Mockito.when(extractor.extract("val1,val2,")).thenReturn(extractedLkvs);
+    Iterable<LookupKV> extracted = decorator.extract("val1,val2,");
+    Assert.assertThat(extracted, CoreMatchers.equalTo(new ArrayList<>()));
+  }
+
+  @Test
+  public void filters_indicators() throws Exception {
+    EnrichmentKey lookupKey = new EnrichmentKey("testenrichment", "");
+    EnrichmentValue lookupValue = new EnrichmentValue(new HashMap<String, Object>() {{
+      put("foo", "val1");
+      put("bar", "");
+      put("baz", "val3");
+    }});
+    LookupKV lkv = new LookupKV<>(lookupKey, lookupValue);
+    List<LookupKV> extractedLkvs = new ArrayList<>();
+    extractedLkvs.add(lkv);
+    Mockito.when(extractor.extract("val1,,val3")).thenReturn(extractedLkvs);
+    Iterable<LookupKV> extracted = decorator.extract("val1,,val3");
+    Assert.assertThat(extracted, CoreMatchers.equalTo(new ArrayList<>()));
+  }
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void bad_value_transform_causes_exception() throws Exception {
+    final int badValue = 5;
+    exception.expect(ClassCastException.class);
+    config1.put(TransformFilterExtractorDecorator.ExtractorOptions.VALUE_TRANSFORM.toString(), badValue);
+    decorator = new TransformFilterExtractorDecorator(extractor);
+    decorator.setZkClient(Optional.of(zkClient));
+    decorator.initialize(config1);
+  }
+
+  @Test
+  public void bad_value_filter_causes_exception() throws Exception {
+    final int badValue = 5;
+    exception.expect(ClassCastException.class);
+    config1.put(TransformFilterExtractorDecorator.ExtractorOptions.VALUE_FILTER.toString(), badValue);
+    decorator = new TransformFilterExtractorDecorator(extractor);
+    decorator.setZkClient(Optional.of(zkClient));
+    decorator.initialize(config1);
+  }
+
+  @Test
+  public void bad_indicator_transform_causes_exception() throws Exception {
+    final int badValue = 5;
+    exception.expect(ClassCastException.class);
+    config1.put(TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR_TRANSFORM.toString(), badValue);
+    decorator = new TransformFilterExtractorDecorator(extractor);
+    decorator.setZkClient(Optional.of(zkClient));
+    decorator.initialize(config1);
+  }
+
+  @Test
+  public void bad_indicator_filter_causes_exception() throws Exception {
+    final int badValue = 5;
+    exception.expect(ClassCastException.class);
+    config1.put(TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR_FILTER.toString(), badValue);
+    decorator = new TransformFilterExtractorDecorator(extractor);
+    decorator.setZkClient(Optional.of(zkClient));
+    decorator.initialize(config1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
index 4e482ad..fee504f 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/extractor/csv/CSVExtractorTest.java
@@ -36,11 +36,11 @@ public class CSVExtractorTest {
         "columns" : {
             "host" : 0
            ,"meta" : 2
-                    }
+       }
        ,"indicator_column" : "host"
        ,"type" : "threat"
        ,"separator" : ","
-               }
+     }
      ,"extractor" : "CSV"
    }
    */
@@ -56,7 +56,7 @@ public class CSVExtractorTest {
 
     Assert.assertEquals(0, (int)ex.getColumnMap().get("host") );
     Assert.assertEquals(2, (int)ex.getColumnMap().get("meta") );
-    Assert.assertEquals(0, ex.getTypeColumn() );
+    Assert.assertEquals(0, ex.getTypeColumnIndex() );
     Assert.assertEquals(0, ex.getIndicatorColumn());
     Assert.assertEquals("threat", ex.getType() );
     Assert.assertEquals(',', ex.getParser().getSeparator());

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
index d0d637d..443d39d 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
@@ -17,14 +17,14 @@
  */
 package org.apache.metron.dataloads.nonbulk.flatfile;
 
-import com.google.common.collect.ImmutableList;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.PosixParser;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
@@ -32,32 +32,35 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.extractor.Extractor;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.dataloads.extractor.csv.CSVExtractor;
 import org.apache.metron.dataloads.hbase.mr.HBaseUtil;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintWriter;
 import java.nio.file.Files;
-import java.nio.file.OpenOption;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.logging.Level;
-import java.util.stream.Stream;
 import java.util.zip.GZIPOutputStream;
 import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
 import java.util.zip.ZipOutputStream;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.startsWith;
+
 public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
 
   private static HBaseTestingUtility testUtil;
@@ -65,6 +68,9 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
   /** The test table. */
   private static HTable testTable;
   private static Configuration config = null;
+  private static TestingServer testZkServer;
+  private static String zookeeperUrl;
+  private static CuratorFramework client;
   private static final String tableName = "enrichment";
   private static final String cf = "cf";
   private static final String csvFile="input.csv";
@@ -78,9 +84,19 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
   private static final File multilineGzFile= new File("target/sefflt_data_2.csv.gz");
   private static final File lineByLineExtractorConfigFile = new File("target/sefflt_extractorConfig_lbl.json");
   private static final File wholeFileExtractorConfigFile = new File("target/sefflt_extractorConfig_wf.json");
+  private static final File stellarExtractorConfigFile = new File("target/sefflt_extractorConfig_stellar.json");
+  private static final File customLineByLineExtractorConfigFile = new File("target/sefflt_extractorConfig_custom.json");
   private static final int NUM_LINES = 1000;
 
   /**
+   * {
+   *   "enrichment_property" : "valfromglobalconfig"
+   * }
+   */
+  @Multiline
+  public static String globalConfig;
+
+  /**
    {
       "config" : {
         "columns" : {
@@ -115,6 +131,59 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
   @Multiline
   private static String wholeFileExtractorConfig;
 
+  /**
+   *{
+   *  "config" : {
+   *    "zk_quorum" : "%ZK_QUORUM%",
+   *    "columns" : {
+   *      "host" : 0,
+   *      "empty" : 1,
+   *      "meta" : 2
+   *    },
+   *    "value_transform" : {
+   *      "host" : "TO_UPPER(host)",
+   *      "empty" : "enrichment_property"
+   *    },
+   *    "value_filter" : "LENGTH(host) > 0",
+   *    "indicator_column" : "host",
+   *    "indicator_transform" : {
+   *      "indicator" : "TO_UPPER(indicator)"
+   *    },
+   *    "indicator_filter" : "LENGTH(indicator) > 0",
+   *    "type" : "enrichment",
+   *    "separator" : ","
+   *  },
+   *  "extractor" : "CSV"
+   *}
+   */
+  @Multiline
+  public static String stellarExtractorConfig;
+
+  /**
+   *{
+   *  "config" : {
+   *    "columns" : {
+   *      "host" : 0,
+   *      "meta" : 2
+   *    },
+   *    "value_transform" : {
+   *      "host" : "TO_UPPER(host)"
+   *    },
+   *    "value_filter" : "LENGTH(host) > 0",
+   *    "indicator_column" : "host",
+   *    "indicator_transform" : {
+   *      "indicator" : "TO_UPPER(indicator)"
+   *    },
+   *    "indicator_filter" : "LENGTH(indicator) > 0",
+   *    "type" : "enrichment",
+   *    "separator" : ","
+   *  },
+   *  "extractor" : "%EXTRACTOR_CLASS%"
+   *}
+   */
+  @Multiline
+  private static String customLineByLineExtractorConfig;
+
   @BeforeClass
   public static void setup() throws Exception {
     UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
@@ -122,6 +191,8 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
     config = kv.getValue();
     testUtil = kv.getKey();
     testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+    zookeeperUrl = getZookeeperUrl(config.get("hbase.zookeeper.quorum"), testUtil.getZkCluster().getClientPort());
+    setupGlobalConfig(zookeeperUrl);
 
     for(Result r : testTable.getScanner(Bytes.toBytes(cf))) {
       Delete d = new Delete(r.getRow());
@@ -142,6 +213,20 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
                , wholeFileExtractorConfig.getBytes()
                , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
     );
+    if(stellarExtractorConfigFile.exists()) {
+      stellarExtractorConfigFile.delete();
+    }
+    Files.write( stellarExtractorConfigFile.toPath()
+            , stellarExtractorConfig.replace("%ZK_QUORUM%", zookeeperUrl).getBytes()
+            , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+    );
+    if(customLineByLineExtractorConfigFile.exists()) {
+      customLineByLineExtractorConfigFile.delete();
+    }
+    Files.write( customLineByLineExtractorConfigFile.toPath()
+               , customLineByLineExtractorConfig.replace("%EXTRACTOR_CLASS%", CSVExtractor.class.getName()).getBytes()
+               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+    );
     if(file1.exists()) {
       file1.delete();
     }
@@ -190,6 +275,16 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
 
   }
 
+  private static String getZookeeperUrl(String host, int port) {
+    return host + ":" + port;
+  }
+
+  private static void setupGlobalConfig(String zookeeperUrl) throws Exception {
+    client = ConfigurationsUtils.getClient(zookeeperUrl);
+    client.start();
+    ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(), zookeeperUrl);
+  }
+
   @AfterClass
   public static void teardown() throws Exception {
     HBaseUtil.INSTANCE.teardown(testUtil);
@@ -200,6 +295,8 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
     multilineZipFile.delete();
     lineByLineExtractorConfigFile.delete();
     wholeFileExtractorConfigFile.delete();
+    stellarExtractorConfigFile.delete();
+    customLineByLineExtractorConfigFile.delete();
   }
 
 
@@ -245,7 +342,6 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
     Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
     Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
     Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
-
   }
 
   @Test
@@ -346,4 +442,51 @@ public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
     Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
   }
 
+  @Test
+  public void stellar_transforms_and_filters_indicators_and_value_metadata() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e " + stellarExtractorConfigFile.getPath()
+            , "-i " + multilineFile.getPath()
+            , "-p 2", "-b 128", "-q"
+    };
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for (Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(NUM_LINES, results.size());
+    Assert.assertThat(results.get(0).getKey().getIndicator(), startsWith("GOOGLE"));
+    Assert.assertThat(results.get(0).getKey().type, equalTo("enrichment"));
+    Assert.assertThat(results.get(0).getValue().getMetadata().size(), equalTo(3));
+    Assert.assertThat(results.get(0).getValue().getMetadata().get("meta").toString(), startsWith("foo"));
+    Assert.assertThat(results.get(0).getValue().getMetadata().get("empty").toString(), startsWith("valfromglobalconfig"));
+    Assert.assertThat(results.get(0).getValue().getMetadata().get("host").toString(), startsWith("GOOGLE"));
+  }
+
+  @Test
+  public void custom_extractor_transforms_and_filters_indicators_and_value_metadata() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e " + customLineByLineExtractorConfigFile.getPath()
+            , "-i " + multilineFile.getPath()
+            , "-p 2", "-b 128", "-q"
+    };
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for (Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(NUM_LINES, results.size());
+    Assert.assertThat(results.get(0).getKey().getIndicator(), startsWith("GOOGLE"));
+    Assert.assertThat(results.get(0).getKey().type, equalTo("enrichment"));
+    Assert.assertThat(results.get(0).getValue().getMetadata().size(), equalTo(2));
+    Assert.assertThat(results.get(0).getValue().getMetadata().get("meta").toString(), startsWith("foo"));
+    Assert.assertThat(results.get(0).getValue().getMetadata().get("host").toString(), startsWith("GOOGLE"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
index 6201ad1..cab0dfc 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/converter/EnrichmentKey.java
@@ -113,4 +113,14 @@ public class EnrichmentKey implements LookupKey {
             ", type='" + type + '\'' +
             '}';
   }
+
+  @Override
+  public String getIndicator() {
+    return indicator;
+  }
+
+  @Override
+  public void setIndicator(String indicator) {
+    this.indicator = indicator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
index b7ea00c..5a258e7 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupKey.java
@@ -18,6 +18,8 @@
 package org.apache.metron.enrichment.lookup;
 
 public interface LookupKey {
-    byte[] toBytes();
-    void fromBytes(byte[] in);
+  byte[] toBytes();
+  void fromBytes(byte[] in);
+  String getIndicator();
+  void setIndicator(String indicator);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/c5bbf5ac/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
index 24fbffd..6cbad02 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/LookupValue.java
@@ -20,9 +20,9 @@
 package org.apache.metron.enrichment.lookup;
 
 import java.util.Map;
-import java.util.NavigableMap;
 
 public interface LookupValue {
     Iterable<Map.Entry<byte[], byte[]>> toColumns();
     void fromColumns(Iterable<Map.Entry<byte[], byte[]>> values);
+    Map<String, Object> getMetadata();
 }


Mime
View raw message