metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject metron git commit: METRON-1529 CONFIG_GET Fails to Retrieve Latest Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997
Date Wed, 25 Apr 2018 13:27:41 GMT
Repository: metron
Updated Branches:
  refs/heads/master b5bf9a987 -> 37e3fd32c


METRON-1529 CONFIG_GET Fails to Retrieve Latest Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/37e3fd32
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/37e3fd32
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/37e3fd32

Branch: refs/heads/master
Commit: 37e3fd32c256ddc129eb7c1363d78e9095a39748
Parents: b5bf9a9
Author: nickwallen <nick@nickallen.org>
Authored: Wed Apr 25 09:27:18 2018 -0400
Committer: nickallen <nickallen@apache.org>
Committed: Wed Apr 25 09:27:18 2018 -0400

----------------------------------------------------------------------
 .../configuration/ConfigurationsUtils.java      | 123 +++-
 .../management/ConfigurationFunctions.java      | 564 ++++++++++---------
 .../management/ConfigurationFunctionsTest.java  | 424 ++++++++++----
 .../shell/DefaultStellarShellExecutor.java      |   4 +-
 .../common/utils/StellarProcessorUtils.java     | 135 +++--
 5 files changed, 825 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index a89db63..c7b39f0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
@@ -45,6 +46,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -235,12 +237,99 @@ public class ConfigurationsUtils {
                               );
   }
 
+  /**
+   * Reads the global configuration stored in Zookeeper.
+   *
+   * @param client The Zookeeper client.
+   * @return The global configuration, if one exists.  Otherwise, null.
+   * @throws Exception
+   */
+  public static Map<String, Object> readGlobalConfigFromZookeeper(CuratorFramework client) throws Exception {
+    Map<String, Object> config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(GLOBAL.getZookeeperRoot(), client);
+    if(bytes.isPresent()) {
+      InputStream in = new ByteArrayInputStream(bytes.get());
+      config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER);
+    }
+
+    return config;
+  }
+
+  /**
+   * Reads the Indexing configuration from Zookeeper.
+   *
+   * @param sensorType The type of sensor.
+   * @param client The Zookeeper client.
+   * @return The indexing configuration for the given sensor type, if one exists.  Otherwise, null.
+   * @throws Exception
+   */
+  public static Map<String, Object> readSensorIndexingConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    Map<String, Object> config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(INDEXING.getZookeeperRoot() + "/" + sensorType, client);
+    if(bytes.isPresent()) {
+      InputStream in = new ByteArrayInputStream(bytes.get());
+      config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER);
+    }
+
+    return config;
+  }
+
+  /**
+   * Reads the Enrichment configuration from Zookeeper.
+   *
+   * @param sensorType The type of sensor.
+   * @param client The Zookeeper client.
+   * @return The Enrichment configuration for the given sensor type, if one exists. Otherwise, null.
+   * @throws Exception
+   */
   public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class);
+    SensorEnrichmentConfig config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client);
+    if (bytes.isPresent()) {
+      config = SensorEnrichmentConfig.fromBytes(bytes.get());
+    }
+
+    return config;
   }
 
+  /**
+   * Reads the Parser configuration from Zookeeper.
+   *
+   * @param sensorType The type of sensor.
+   * @param client The Zookeeper client.
+   * @return The Parser configuration for the given sensor type, if one exists. Otherwise, null.
+   * @throws Exception
+   */
   public static SensorParserConfig readSensorParserConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(PARSER.getZookeeperRoot() + "/" + sensorType, client)), SensorParserConfig.class);
+    SensorParserConfig config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(PARSER.getZookeeperRoot() + "/" + sensorType, client);
+    if(bytes.isPresent()) {
+      config = SensorParserConfig.fromBytes(bytes.get());
+    }
+
+    return config;
+  }
+
+  /**
+   * Reads the Profiler configuration from Zookeeper.
+   *
+   * @param client The Zookeeper client.
+   * @return THe Profiler configuration.
+   * @throws Exception
+   */
+  public static ProfilerConfig readProfilerConfigFromZookeeper(CuratorFramework client) throws Exception {
+    ProfilerConfig config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(PROFILER.getZookeeperRoot(), client);
+    if(bytes.isPresent()) {
+      config = ProfilerConfig.fromBytes(bytes.get());
+    }
+
+    return config;
   }
 
   public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
@@ -289,6 +378,36 @@ public class ConfigurationsUtils {
     }
   }
 
+  /**
+   * Read raw bytes from Zookeeper.
+   *
+   * @param path The path to the Zookeeper node to read.
+   * @param client The Zookeeper client.
+   * @return The bytes read from Zookeeper, if node exists.  Otherwise, null.
+   * @throws Exception
+   */
+  public static Optional<byte[]> readFromZookeeperSafely(String path, CuratorFramework client) throws Exception {
+    Optional<byte[]> result = Optional.empty();
+
+    try {
+      byte[] bytes = readFromZookeeper(path, client);
+      result = Optional.of(bytes);
+
+    } catch(KeeperException.NoNodeException e) {
+      LOG.debug("Zookeeper node missing; path={}", e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Read raw bytes from Zookeeper.
+   *
+   * @param path The path to the Zookeeper node to read.
+   * @param client The Zookeeper client.
+   * @return The bytes read from Zookeeper.
+   * @throws Exception If the path does not exist in Zookeeper.
+   */
   public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
     if (client != null && client.getData() != null && path != null) {
       return client.getData().forPath(path);

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
index af90e14..5a1281c 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
@@ -18,26 +18,17 @@
 package org.apache.metron.management;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
@@ -46,203 +37,280 @@ import org.apache.metron.stellar.dsl.StellarFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.INDEXING;
+import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorParserConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorIndexingConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorParserConfigToZookeeper;
+
+/**
+ * Defines functions that enable modification of Metron configuration values.
+ */
 public class ConfigurationFunctions {
+
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static EnumMap<ConfigurationType, Object> configMap = new EnumMap<ConfigurationType, Object>(ConfigurationType.class) {{
-    for(ConfigurationType ct : ConfigurationType.values()) {
-      put(ct, Collections.synchronizedMap(new HashMap<String, String>()));
-    }
-    put(ConfigurationType.GLOBAL, "");
-    put(ConfigurationType.PROFILER, "");
-  }};
-  private static synchronized void setupTreeCache(Context context) throws Exception {
-    try {
-      Optional<Object> treeCacheOpt = context.getCapability("treeCache");
-      if (treeCacheOpt.isPresent()) {
-        return;
-      }
+
+
+  /**
+   * Retrieves the Zookeeper client from the execution context.
+   *
+   * @param context The execution context.
+   * @return A Zookeeper client, if one exists.  Otherwise, an exception is thrown.
+   */
+  private static CuratorFramework getZookeeperClient(Context context) {
+
+    Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT, true);
+    if(clientOpt.isPresent()) {
+      return (CuratorFramework) clientOpt.get();
+
+    } else {
+      throw new IllegalStateException("Missing ZOOKEEPER_CLIENT; zookeeper connection required");
     }
-    catch(IllegalStateException ex) {
+  }
 
+  /**
+   * Get an argument from a list of arguments.
+   *
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
     }
-    Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
-    if(!clientOpt.isPresent()) {
-      throw new IllegalStateException("I expected a zookeeper client to exist and it did not.  Please connect to zookeeper.");
+
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  /**
+   * Serializes a configuration object to the raw JSON.
+   *
+   * @param object The configuration object to serialize
+   * @return
+   */
+  private static String toJSON(Object object) {
+
+    if(object == null) {
+      return null;
     }
-    CuratorFramework client = (CuratorFramework) clientOpt.get();
-    TreeCache cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
-    TreeCacheListener listener = new TreeCacheListener() {
-      @Override
-      public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
-        if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-          String path = event.getData().getPath();
-          byte[] data = event.getData().getData();
-          String sensor = Iterables.getLast(Splitter.on("/").split(path), null);
-          if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER);
-            sensorMap.put(sensor, new String(data));
-          } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.GLOBAL, new String(data));
-          } else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.PROFILER, new String(data));
-          } else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
-            sensorMap.put(sensor, new String(data));
-          } else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
-            sensorMap.put(sensor, new String(data));
-          }
-        }
-        else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-          String path = event.getData().getPath();
-          String sensor = Iterables.getLast(Splitter.on("/").split(path), null);
-          if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER);
-            sensorMap.remove(sensor);
-          }
-          else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
-            sensorMap.remove(sensor);
-          }
-          else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
-            sensorMap.remove(sensor);
-          }
-          else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.PROFILER, null);
-          }
-          else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.GLOBAL, null);
-          }
-        }
-      }
-    };
-    cache.getListenable().addListener(listener);
-    cache.start();
-    for(ConfigurationType ct : ConfigurationType.values()) {
-      switch(ct) {
-        case GLOBAL:
-        case PROFILER:
-          {
-            String data = "";
-            try {
-              byte[] bytes = ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot(), client);
-              data = new String(bytes);
-            }
-            catch(Exception ex) {
-
-            }
-            configMap.put(ct, data);
-          }
-          break;
-        case INDEXING:
-        case ENRICHMENT:
-        case PARSER:
-          {
-            List<String> sensorTypes = client.getChildren().forPath(ct.getZookeeperRoot());
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ct);
-            for(String sensorType : sensorTypes) {
-              sensorMap.put(sensorType, new String(ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot() + "/" + sensorType, client)));
-            }
-          }
-          break;
-      }
+
+    try {
+      return JSONUtils.INSTANCE.toJSON(object, true);
+
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
     }
-    context.addCapability("treeCache", () -> cache);
   }
 
   @Stellar(
-           namespace = "CONFIG"
-          ,name = "GET"
-          ,description = "Retrieve a Metron configuration from zookeeper."
-          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
-                    , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
-                    , "emptyIfNotPresent - If true, then return an empty, minimally viable config"
-                    }
-          ,returns = "The String representation of the config in zookeeper"
-          )
+          namespace = "CONFIG",
+          name = "GET",
+          description = "Retrieve a Metron configuration from zookeeper.",
+          params = {
+                  "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER",
+                  "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)",
+                  "emptyIfNotPresent - If true, then return an empty, minimally viable config"
+          },
+          returns = "The String representation of the config in zookeeper")
   public static class ConfigGet implements StellarFunction {
-    boolean initialized = false;
+
+    /**
+     * Whether the function has been initialized.
+     */
+    private boolean initialized = false;
+
+    /**
+     * The Zookeeper client.
+     */
+    private CuratorFramework zkClient;
+
     @Override
     public Object apply(List<Object> args, Context context) throws ParseException {
-      ConfigurationType type = ConfigurationType.valueOf((String)args.get(0));
-      boolean emptyIfNotPresent = true;
+      String result;
 
-      switch(type) {
-        case GLOBAL:
-        case PROFILER:
-          return configMap.get(type);
-        case PARSER: {
-          String sensor = (String) args.get(1);
-          if(args.size() > 2) {
-            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
-          }
-          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
-          String ret = sensorMap.get(sensor);
-          if (ret == null && emptyIfNotPresent ) {
-            SensorParserConfig config = new SensorParserConfig();
-            config.setSensorTopic(sensor);
-            try {
-              ret = JSONUtils.INSTANCE.toJSON(config, true);
-            } catch (JsonProcessingException e) {
-              LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
-              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
-            }
-          }
-          return ret;
-        }
-        case INDEXING: {
-          String sensor = (String) args.get(1);
-          if(args.size() > 2) {
-            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
-          }
-          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
-          String ret = sensorMap.get(sensor);
-          if (ret == null && emptyIfNotPresent ) {
-            Map<String, Object> config = new HashMap<>();
-            try {
-              ret = JSONUtils.INSTANCE.toJSON(config, true);
-              IndexingConfigurations.setIndex(config, sensor);
-            } catch (JsonProcessingException e) {
-              LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
-              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
-            }
-          }
-          return ret;
-        }
-        case ENRICHMENT: {
-          String sensor = (String) args.get(1);
-          if(args.size() > 2) {
-            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
-          }
-          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
-          String ret = sensorMap.get(sensor);
-          if (ret == null && emptyIfNotPresent ) {
-            SensorEnrichmentConfig config = new SensorEnrichmentConfig();
-            try {
-              ret = JSONUtils.INSTANCE.toJSON(config, true);
-            } catch (JsonProcessingException e) {
-              LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
-              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
-            }
-          }
-          return ret;
+      // the configuration type to write
+      String arg0 = getArg(0, String.class, args);
+      ConfigurationType type = ConfigurationType.valueOf(arg0);
+
+      try {
+
+        if (GLOBAL == type) {
+          result = getGlobalConfig(args);
+
+        } else if (PROFILER == type) {
+          result = getProfilerConfig(args);
+
+        } else if (ENRICHMENT == type) {
+          result = getEnrichmentConfig(args);
+
+        } else if (INDEXING == type) {
+          result = getIndexingConfig(args);
+
+        } else if (PARSER == type) {
+          result = getParserConfig(args);
+
+        } else {
+          throw new IllegalArgumentException("Unexpected configuration type: " + type);
         }
-        default:
-          throw new UnsupportedOperationException("Unable to support type " + type);
+
+      } catch(Exception e) {
+        throw new RuntimeException(e);
       }
+
+      return result;
     }
 
-    @Override
-    public void initialize(Context context) {
-      try {
-        setupTreeCache(context);
-      } catch (Exception e) {
-        LOG.error("Unable to initialize: {}", e.getMessage(), e);
+    /**
+     * Retrieves the Global configuration.
+     *
+     * @return The Global configuration.
+     * @throws Exception
+     */
+    private String getGlobalConfig(List<Object> args) throws Exception {
+
+      Map<String, Object> globals = readGlobalConfigFromZookeeper(zkClient);
+
+      // provide empty/default config if one is not present?
+      if(globals == null && emptyIfNotPresent(args)) {
+        globals = new HashMap<>();
       }
-      finally {
-        initialized = true;
+
+      return toJSON(globals);
+    }
+
+    /**
+     * Retrieves the Parser configuration.
+     *
+     * @param args The function arguments.
+     * @return The Parser configuration.
+     * @throws Exception
+     */
+    private String getParserConfig(List<Object> args) throws Exception {
+
+      // retrieve the enrichment config for the given sensor
+      String sensor = getArg(1, String.class, args);
+      SensorParserConfig sensorConfig = readSensorParserConfigFromZookeeper(sensor, zkClient);
+
+      // provide empty/default config if one is not present?
+      if(sensorConfig == null && emptyIfNotPresent(args)) {
+        sensorConfig = new SensorParserConfig();
       }
+
+     return toJSON(sensorConfig);
+    }
+
+    /**
+     * Retrieve the Enrichment configuration.
+     *
+     * @param args The function arguments.
+     * @return The Enrichment configuration as a JSON string.
+     * @throws Exception
+     */
+    private String getEnrichmentConfig(List<Object> args) throws Exception {
+
+      // retrieve the enrichment config for the given sensor
+      String sensor = getArg(1, String.class, args);
+      SensorEnrichmentConfig sensorConfig = readSensorEnrichmentConfigFromZookeeper(sensor, zkClient);
+
+      // provide empty/default config if one is not present?
+      if(sensorConfig == null && emptyIfNotPresent(args)) {
+        sensorConfig = new SensorEnrichmentConfig();
+      }
+
+      return toJSON(sensorConfig);
+    }
+
+    /**
+     * Retrieve the Indexing configuration.
+     *
+     * @param args The function arguments.
+     * @return The Indexing configuration as a JSON string.
+     * @throws Exception
+     */
+    private String getIndexingConfig(List<Object> args) throws Exception {
+
+      // retrieve the enrichment config for the given sensor
+      String sensor = getArg(1, String.class, args);
+      Map<String, Object> sensorConfig = readSensorIndexingConfigFromZookeeper(sensor, zkClient);
+
+      // provide empty/default config if one is not present?
+      if(sensorConfig == null && emptyIfNotPresent(args)) {
+        sensorConfig = Collections.emptyMap();
+      }
+
+      return toJSON(sensorConfig);
+    }
+
+    /**
+     * Retrieve the Profiler configuration.
+     *
+     * @param args The function arguments.
+     * @return The Profiler configuration as a JSON string.
+     * @throws Exception
+     */
+    private String getProfilerConfig(List<Object> args) throws Exception {
+
+      ProfilerConfig profilerConfig = readProfilerConfigFromZookeeper(zkClient);
+
+      // provide empty/default config if one is not present?
+      if(profilerConfig == null && emptyIfNotPresent(args)) {
+        profilerConfig = new ProfilerConfig();
+      }
+
+      return toJSON(profilerConfig);
+    }
+
+    /**
+     * Retrieves the 'emptyIfNotPresent' argument.
+     *
+     * <p>This determines whether a default configuration should be returned, if no
+     * configuration is not present.  This defaults to true.
+     *
+     * @param args The function arguments.
+     * @return The 'emptyIfNotPresent' argument.
+     * @throws Exception
+     */
+    private boolean emptyIfNotPresent(List<Object> args) {
+
+      boolean emptyIfNotPresent = true;
+      int lastIndex = args.size() - 1;
+
+      // expect 'emptyIfNotPresent' to always be the last boolean arg
+      if(args.size() >= 2 && args.get(lastIndex) instanceof Boolean) {
+        emptyIfNotPresent = getArg(lastIndex, Boolean.class, args);
+      }
+
+      return emptyIfNotPresent;
+    }
+
+    @Override
+    public void initialize(Context context) {
+      zkClient = getZookeeperClient(context);
     }
 
     @Override
@@ -250,91 +318,69 @@ public class ConfigurationFunctions {
       return initialized;
     }
   }
+
   @Stellar(
-           namespace = "CONFIG"
-          ,name = "PUT"
-          ,description = "Updates a Metron config to Zookeeper."
-          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
-                    ,"config - The config (a string in JSON form) to update"
-                    , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
-                    }
-          ,returns = "The String representation of the config in zookeeper"
-          )
+          namespace = "CONFIG",
+          name = "PUT",
+          description = "Updates a Metron config to Zookeeper.",
+          params = {
+                  "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER",
+                  "config - The config (a string in JSON form) to update",
+                  "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
+          },
+          returns = "The String representation of the config in zookeeper")
   public static class ConfigPut implements StellarFunction {
-    private CuratorFramework client;
-    private boolean initialized = false;
 
     @Override
     public Object apply(List<Object> args, Context context) throws ParseException {
-      ConfigurationType type = ConfigurationType.valueOf((String)args.get(0));
-      String config = (String)args.get(1);
-      if(config == null) {
-        return null;
-      }
-      try {
-        switch (type) {
-          case GLOBAL:
-            ConfigurationsUtils.writeGlobalConfigToZookeeper(config.getBytes(), client);
-            break;
-          case PROFILER:
-            ConfigurationsUtils.writeProfilerConfigToZookeeper(config.getBytes(), client);
-            break;
-          case ENRICHMENT:
-          {
-            String sensor = (String) args.get(2);
-            if(sensor == null) {
-              return null;
-            }
-            ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.getBytes(), client);
-          }
-          break;
-          case INDEXING:
-          {
-            String sensor = (String) args.get(2);
-            if(sensor == null) {
-              return null;
-            }
-            ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(sensor, config.getBytes(), client);
-          }
-          break;
-          case PARSER:
-            {
-            String sensor = (String) args.get(2);
-              if(sensor == null) {
-              return null;
-            }
-            ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensor, config.getBytes(), client);
+
+      // the configuration type to write
+      String arg0 = getArg(0, String.class, args);
+      ConfigurationType type = ConfigurationType.valueOf(arg0);
+
+      // the configuration value to write
+      String value = getArg(1, String.class, args);
+      if(value != null) {
+
+        CuratorFramework client = getZookeeperClient(context);
+        try {
+
+          if(GLOBAL == type) {
+            writeGlobalConfigToZookeeper(value.getBytes(), client);
+
+          } else if(PROFILER == type) {
+            writeProfilerConfigToZookeeper(value.getBytes(), client);
+
+          } else if(ENRICHMENT == type) {
+            String sensor = getArg(2, String.class, args);
+            writeSensorEnrichmentConfigToZookeeper(sensor, value.getBytes(), client);
+
+          } else if(INDEXING == type) {
+            String sensor = getArg(2, String.class, args);
+            writeSensorIndexingConfigToZookeeper(sensor, value.getBytes(), client);
+
+          } else if (PARSER == type) {
+            String sensor = getArg(2, String.class, args);
+            writeSensorParserConfigToZookeeper(sensor, value.getBytes(), client);
           }
-          break;
+
+        } catch(Exception e) {
+          LOG.error("Unexpected exception: {}", e.getMessage(), e);
+          throw new ParseException(e.getMessage());
         }
       }
-      catch(Exception ex) {
-        LOG.error("Unable to put config: {}", ex.getMessage(), ex);
-        throw new ParseException("Unable to put config: " + ex.getMessage(), ex);
-      }
+
       return null;
     }
 
     @Override
     public void initialize(Context context) {
-      Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
-      if(!clientOpt.isPresent()) {
-        throw new IllegalStateException("I expected a zookeeper client to exist and it did not.  Please connect to zookeeper.");
-      }
-      client = (CuratorFramework) clientOpt.get();
-      try {
-        setupTreeCache(context);
-      } catch (Exception e) {
-        LOG.error("Unable to initialize: {}", e.getMessage(), e);
-      }
-      finally {
-        initialized = true;
-      }
+      // nothing to do
     }
 
     @Override
     public boolean isInitialized() {
-      return initialized;
+      return true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 1920031..67e2a9d 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -19,194 +19,393 @@ package org.apache.metron.management;
 
 import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.log4j.Level;
 import org.apache.metron.common.cli.ConfigurationManager;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.test.utils.UnitTestHelper;
-import org.json.simple.parser.JSONParser;
 import org.json.simple.JSONObject;
-import org.junit.Assert;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Map;
 
 import static org.apache.metron.TestConstants.PARSER_CONFIGS_PATH;
 import static org.apache.metron.TestConstants.SAMPLE_CONFIG_PATH;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
 import static org.apache.metron.management.utils.FileUtils.slurp;
 import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests the ConfigurationFunctions class.
+ */
 public class ConfigurationFunctionsTest {
+
   private static TestingServer testZkServer;
-  private static CuratorFramework client;
   private static String zookeeperUrl;
-  private Context context = new Context.Builder()
-            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-            .build();
+  private static CuratorFramework client;
+  private static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json");
+  private static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+  private static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json");
+  private static String goodTestIndexingConfig = slurp( SAMPLE_CONFIG_PATH + "/indexing/test.json");
+
+  private Context context;
+  private JSONParser parser;
+
+  /**
+   * {
+   *   "profiles" : [
+   *      {
+   *        "profile" : "counter",
+   *        "foreach" : "ip_src_addr",
+   *        "init"    : { "counter" : 0 },
+   *        "update"  : { "counter" : "counter + 1" },
+   *        "result"  : "counter"
+   *      }
+   *   ],
+   *   "timestampField" : "timestamp"
+   * }
+   */
+  @Multiline
+  private static String goodProfilerConfig;
+
   @BeforeClass
-  public static void setup() throws Exception {
+  public static void setupZookeeper() throws Exception {
+
+    // zookeeper server
     testZkServer = new TestingServer(true);
     zookeeperUrl = testZkServer.getConnectString();
+
+    // zookeeper client
     client = ConfigurationsUtils.getClient(zookeeperUrl);
     client.start();
+  }
 
-    pushConfigs(SAMPLE_CONFIG_PATH);
-    pushConfigs(PARSER_CONFIGS_PATH);
+  @Before
+  public void setup() throws Exception {
 
+    context = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .build();
+
+    parser = new JSONParser();
 
+    // push configs to zookeeper
+    pushConfigs(SAMPLE_CONFIG_PATH, zookeeperUrl);
+    pushConfigs(PARSER_CONFIGS_PATH, zookeeperUrl);
+    writeProfilerConfigToZookeeper(goodProfilerConfig.getBytes(), client);
   }
 
-  private static void pushConfigs(String inputPath) throws Exception {
-    String[] args = new String[]{
-            "-z", zookeeperUrl
-            , "--mode", "PUSH"
-            , "--input_dir", inputPath
-    };
-    ConfigurationManager manager = new ConfigurationManager();
-    manager.run(ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args));
+  /**
+   * Deletes a path within Zookeeper.
+   *
+   * @param path The path within Zookeeper to delete.
+   * @throws Exception
+   */
+  private void deletePath(String path) throws Exception {
+    client.delete().forPath(path);
   }
 
+  /**
+   * Transforms a String to a {@link JSONObject}.
+   *
+   * @param input The input String to transform
+   * @return A {@link JSONObject}.
+   * @throws org.json.simple.parser.ParseException
+   */
+  private JSONObject toJSONObject(String input) throws org.json.simple.parser.ParseException {
 
-  static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json");
+    if(input == null) {
+      return null;
+    }
+    return (JSONObject) parser.parse(input.trim());
+  }
 
   /**
-    {
-      "sensorTopic" : "brop",
-      "parserConfig" : { },
-      "fieldTransformations" : [ ],
-      "readMetadata":false,
-      "mergeMetadata":false,
-      "parserParallelism" : 1,
-      "errorWriterParallelism" : 1,
-      "spoutNumTasks" : 1,
-      "stormConfig" : {},
-      "errorWriterNumTasks":1,
-      "spoutConfig":{},
-      "parserNumTasks":1,
-      "spoutParallelism":1
-    }
+   * Push configuration values to Zookeeper.
+   *
+   * @param inputPath The local filesystem path to the configurations.
+   * @param zookeeperUrl The URL of Zookeeper.
+   * @throws Exception
    */
-  @Multiline
-  static String defaultBropParserConfig;
+  private static void pushConfigs(String inputPath, String zookeeperUrl) throws Exception {
+
+    String[] args = new String[] {
+            "-z", zookeeperUrl,
+            "--mode", "PUSH",
+            "--input_dir", inputPath
+    };
+    CommandLine cli = ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args);
 
+    ConfigurationManager manager = new ConfigurationManager();
+    manager.run(cli);
+  }
 
+  /**
+   * The CONFIG_GET function should be able to return the Parser configuration
+   * for a given sensor.
+   */
   @Test
-  public void testParserGetHappyPath() {
+  public void testGetParser() throws Exception {
+
+    String out = (String) run("CONFIG_GET('PARSER', 'bro')", context);
 
-    Object out = run("CONFIG_GET('PARSER', 'bro')", new HashMap<>(), context);
-    Assert.assertEquals(goodBroParserConfig, out);
+    SensorParserConfig actual = SensorParserConfig.fromBytes(out.getBytes());
+    SensorParserConfig expected = SensorParserConfig.fromBytes(goodBroParserConfig.getBytes());
+    assertEquals(expected, actual);
   }
 
+  /**
+   * The CONFIG_GET function should NOT return any configuration when the
+   * Parser configuration for a given sensor is missing AND emptyIfNotPresent = false.
+   */
   @Test
-  public void testParserGetMissWithoutDefault() {
+  public void testGetParserMissWithoutDefault() {
 
-    {
-      Object out = run("CONFIG_GET('PARSER', 'brop', false)", new HashMap<>(), context);
-      Assert.assertNull(out);
-    }
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('PARSER', 'sensor', false)", context);
+    assertNull(out);
   }
 
+  /**
+   * The CONFIG_GET function should return a default configuration when none
+   * currently exists.
+   */
   @Test
-  public void testParserGetMissWithDefault() throws Exception {
-    JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropParserConfig);
+  public void testGetParserMissWithDefault() throws Exception {
 
+    SensorParserConfig expected = new SensorParserConfig();
     {
-      Object out = run("CONFIG_GET('PARSER', 'brop')", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      Object out = run("CONFIG_GET('PARSER', 'sensor')", context);
+      SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes());
+      assertEquals(expected, actual);
     }
     {
-      Object out = run("CONFIG_GET('PARSER', 'brop', true)", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      Object out = run("CONFIG_GET('PARSER', 'sensor', true)", context);
+      SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes());
+      assertEquals(expected, actual);
     }
   }
 
-  static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+  /**
+   * The CONFIG_GET function should be able to return the Enrichment configuration
+   * for a given sensor.
+   */
+  @Test
+  public void testGetEnrichment() throws Exception {
+
+    String out = (String) run("CONFIG_GET('ENRICHMENT', 'test')", context);
+
+    SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+    SensorEnrichmentConfig expected = SensorEnrichmentConfig.fromBytes(goodTestEnrichmentConfig.getBytes());
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * No default configuration should be provided in this case.
+   */
+  @Test
+  public void testGetEnrichmentMissWithoutDefault() {
+
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('ENRICHMENT', 'sense', false)", context);
+    assertNull(out);
+  }
 
   /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
+  @Test
+  public void testGetEnrichmentMissWithDefault() throws Exception {
+
+    // expect an empty configuration to be returned
+    SensorEnrichmentConfig expected = new SensorEnrichmentConfig();
     {
-      "enrichment" : {
-        "fieldMap" : { },
-        "fieldToTypeMap" : { },
-        "config" : { }
-      },
-      "threatIntel" : {
-        "fieldMap" : { },
-        "fieldToTypeMap" : { },
-        "config" : { },
-        "triageConfig" : {
-          "riskLevelRules" : [ ],
-          "aggregator" : "MAX",
-          "aggregationConfig" : { }
-        }
-      },
-      "configuration" : { }
+      String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor')", context);
+      SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+      assertEquals(expected, actual);
+    }
+    {
+      String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor', true)", context);
+      SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+      assertEquals(expected, actual);
     }
+  }
+
+  /**
+   * The CONFIG_GET function should be able to return the Indexing configuration
+   * for a given sensor.
    */
-  @Multiline
-  static String defaultBropEnrichmentConfig;
+  @Test
+  public void testGetIndexing() throws Exception {
 
+    String out = (String) run("CONFIG_GET('INDEXING', 'test')", context);
+
+    Map<String, Object> actual = toJSONObject(out);
+    Map<String, Object> expected = toJSONObject(goodTestIndexingConfig);
+    assertEquals(expected, actual);
+  }
 
+  /**
+   * No default configuration should be provided in this case.
+   */
   @Test
-  public void testEnrichmentGetHappyPath() {
+  public void testGetIndexingMissWithoutDefault() {
 
-    Object out = run("CONFIG_GET('ENRICHMENT', 'test')", new HashMap<>(), context);
-    Assert.assertEquals(goodTestEnrichmentConfig, out.toString().trim());
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('INDEXING', 'sense', false)", context);
+    assertNull(out);
   }
 
+  /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
   @Test
-  public void testEnrichmentGetMissWithoutDefault() {
+  public void testGetIndexingtMissWithDefault() throws Exception {
 
+    // expect an empty configuration to be returned
+    Map<String, Object> expected = Collections.emptyMap();
+    {
+      String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor')", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
+    }
     {
-      Object out = run("CONFIG_GET('ENRICHMENT', 'brop', false)", new HashMap<>(), context);
-      Assert.assertNull(out);
+      String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor', true)", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
     }
   }
 
+  /**
+   * The CONFIG_GET function should be able to return the Profiler configuration.
+   */
+  @Test
+  public void testGetProfiler() throws Exception {
+
+    String out = (String) run("CONFIG_GET('PROFILER')", context);
+
+    ProfilerConfig actual = ProfilerConfig.fromBytes(out.getBytes());
+    ProfilerConfig expected = ProfilerConfig.fromBytes(goodProfilerConfig.getBytes());
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * No default configuration should be provided in this case.
+   */
   @Test
-  public void testEnrichmentGetMissWithDefault() throws Exception {
-    JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropEnrichmentConfig);
+  public void testGetProfilerMissWithoutDefault() throws Exception {
+
+    deletePath(PROFILER.getZookeeperRoot());
 
+    // expect null because emptyIfNotPresent = false
+    String out = (String) run("CONFIG_GET('PROFILER', false)", context);
+    assertNull(out);
+  }
+
+  /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
+  @Test
+  public void testGetProfilerMissWithDefault() throws Exception {
+
+    // there is no profiler config in zookeeper
+    deletePath(PROFILER.getZookeeperRoot());
+
+    // expect an empty configuration to be returned
+    ProfilerConfig expected = new ProfilerConfig();
     {
-      Object out = run("CONFIG_GET('ENRICHMENT', 'brop')", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      String out = (String) run("CONFIG_GET('PROFILER', true)", context);
+      ProfilerConfig actual = ProfilerConfig.fromJSON(out);
+      assertEquals(expected, actual);
     }
     {
-      Object out = run("CONFIG_GET('ENRICHMENT', 'brop', true)", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      String out = (String) run("CONFIG_GET('PROFILER')", context);
+      ProfilerConfig actual = ProfilerConfig.fromJSON(out);
+      assertEquals(expected, actual);
     }
   }
 
-  static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json");
+  @Test
+  public void testGetGlobal() throws Exception {
+
+    String out = (String) run("CONFIG_GET('GLOBAL')", context);
+
+    Map<String, Object> actual = toJSONObject(out);
+    Map<String, Object> expected = toJSONObject(goodGlobalConfig);
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * No default configuration should be provided in this case.
+   */
+  @Test
+  public void testGetGlobalMissWithoutDefault() throws Exception {
+
+    // there is no global config in zookeeper
+    deletePath(GLOBAL.getZookeeperRoot());
+
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('GLOBAL', false)", context);
+    assertNull(out);
+  }
 
+  /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
   @Test
-  public void testGlobalGet() {
+  public void testGetGlobalMissWithDefault() throws Exception {
+
+    // there is no global config in zookeeper
+    deletePath(GLOBAL.getZookeeperRoot());
 
-    Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context);
-    Assert.assertEquals(goodGlobalConfig, out.toString().trim());
+    // expect an empty configuration to be returned
+    Map<String, Object> expected = Collections.emptyMap();
+    {
+      String out = (String) run("CONFIG_GET('GLOBAL')", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
+    }
+    {
+      String out = (String) run("CONFIG_GET('GLOBAL', true)", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
+    }
   }
 
   @Test
-  public void testGlobalPut() {
+  public void testPutGlobal() throws Exception {
+
+    String out = (String) run("CONFIG_GET('GLOBAL')", context);
 
-    Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context);
-    Assert.assertEquals(goodGlobalConfig, out.toString().trim());
+    Map<String, Object> actual = toJSONObject(out);
+    Map<String, Object> expected = toJSONObject(goodGlobalConfig);
+    assertEquals(expected, actual);
   }
 
   @Test(expected=ParseException.class)
-  public void testGlobalPutBad() {
+  public void testPutGlobalBad() {
     {
       UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
       try {
-        run("CONFIG_PUT('GLOBAL', 'foo bar')", new HashMap<>(), context);
+        run("CONFIG_PUT('GLOBAL', 'foo bar')", context);
       } catch(ParseException e) {
         UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.ERROR);
         throw e;
@@ -215,23 +414,23 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
-  public void testIndexingPut() throws InterruptedException {
-    String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", new HashMap<>(), context);
+  public void testPutIndexing() throws InterruptedException {
+    String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", context);
     run("CONFIG_PUT('INDEXING', config, 'testIndexingPut')", ImmutableMap.of("config", brop), context);
     boolean foundMatch = false;
     for(int i = 0;i < 10 && !foundMatch;++i) {
-      String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", new HashMap<>(), context);
+      String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", context);
       foundMatch =  brop.equals(bropNew);
       if(foundMatch) {
         break;
       }
       Thread.sleep(2000);
     }
-    Assert.assertTrue(foundMatch);
+    assertTrue(foundMatch);
   }
 
   @Test(expected= ParseException.class)
-  public void testIndexingPutBad() throws InterruptedException {
+  public void testPutIndexingBad() throws InterruptedException {
     {
       {
         UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
@@ -246,23 +445,26 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
-  public void testEnrichmentPut() throws InterruptedException {
-    String brop= (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut')", new HashMap<>(), context);
-    run("CONFIG_PUT('ENRICHMENT', config, 'testEnrichmentPut')", ImmutableMap.of("config", brop), context);
+  public void testPutEnrichment() throws InterruptedException {
+    String config = (String) run("CONFIG_GET('ENRICHMENT', 'sensor')", context);
+    assertNotNull(config);
+
+    run("CONFIG_PUT('ENRICHMENT', config, 'sensor')", ImmutableMap.of("config", config), context);
+
     boolean foundMatch = false;
     for(int i = 0;i < 10 && !foundMatch;++i) {
-      String bropNew = (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut', false)", new HashMap<>(), context);
-      foundMatch =  brop.equals(bropNew);
+      String newConfig = (String) run("CONFIG_GET('ENRICHMENT', 'sensor', false)", context);
+      foundMatch = config.equals(newConfig);
       if(foundMatch) {
         break;
       }
       Thread.sleep(2000);
     }
-    Assert.assertTrue(foundMatch);
+    assertTrue(foundMatch);
   }
 
   @Test(expected= ParseException.class)
-  public void testEnrichmentPutBad() throws InterruptedException {
+  public void testPutEnrichmentBad() throws InterruptedException {
     {
       {
         UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
@@ -277,23 +479,23 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
-  public void testParserPut() throws InterruptedException {
-    String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", new HashMap<>(), context);
+  public void testPutParser() throws InterruptedException {
+    String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", context);
     run("CONFIG_PUT('PARSER', config, 'testParserPut')", ImmutableMap.of("config", brop), context);
     boolean foundMatch = false;
     for(int i = 0;i < 10 && !foundMatch;++i) {
-      String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", new HashMap<>(), context);
+      String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", context);
       foundMatch =  brop.equals(bropNew);
       if(foundMatch) {
         break;
       }
       Thread.sleep(2000);
     }
-    Assert.assertTrue(foundMatch);
+    assertTrue(foundMatch);
   }
 
   @Test(expected= ParseException.class)
-  public void testParserPutBad() throws InterruptedException {
+  public void testPutParserBad() throws InterruptedException {
     {
       UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
       try {

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
index 781a0cf..352ae2b 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
@@ -52,7 +52,6 @@ import java.io.ByteArrayInputStream;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -342,15 +341,18 @@ public class DefaultStellarShellExecutor implements StellarShellExecutor {
    * @param zkClient An optional Zookeeper client.
    */
   private Context createContext(Properties properties, Optional<CuratorFramework> zkClient) throws Exception {
+
     Context.Builder contextBuilder = new Context.Builder();
     Map<String, Object> globals;
     if (zkClient.isPresent()) {
+      LOG.debug("Zookeeper client present; fetching globals from Zookeeper.");
 
       // fetch globals from zookeeper
       globals = fetchGlobalConfig(zkClient.get());
       contextBuilder.with(ZOOKEEPER_CLIENT, () -> zkClient.get());
 
     } else {
+      LOG.debug("No Zookeeper client; initializing empty globals.");
 
       // use empty globals to allow a user to '%define' their own
       globals = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
index 5912657..d5f267e 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
@@ -18,17 +18,18 @@
 
 package org.apache.metron.stellar.common.utils;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.stellar.common.StellarPredicateProcessor;
+import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.DefaultVariableResolver;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.VariableResolver;
-import com.google.common.collect.ImmutableList;
-import org.apache.metron.stellar.common.StellarPredicateProcessor;
-import org.apache.metron.stellar.common.StellarProcessor;
 import org.junit.Assert;
 
 import java.util.AbstractMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Spliterators;
@@ -39,39 +40,76 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+/**
+ * Utilities for executing and validating Stellar expressions.
+ */
 public class StellarProcessorUtils {
 
-    /**
-     * This utility class is intended for use while unit testing Stellar operators.
-     * It is included in the "main" code so third-party operators will not need
-     * a test dependency on Stellar's test-jar.
-     *
-     * This class ensures the basic contract of a stellar expression is adhered to:
-     * 1. Validate works on the expression
-     * 2. The output can be serialized and deserialized properly
-     *
-     * @param rule
-     * @param variables
-     * @param context
-     * @return ret
-     */
-    public static Object run(String rule, Map<String, Object> variables, Context context) {
-        StellarProcessor processor = new StellarProcessor();
-        Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
-        Object ret = processor.parse(rule, new DefaultVariableResolver(x -> variables.get(x),x-> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context);
-        byte[] raw = SerDeUtils.toBytes(ret);
-        Object actual = SerDeUtils.fromBytes(raw, Object.class);
-        Assert.assertEquals(ret, actual);
-        return ret;
-    }
+  /**
+   * Execute and validate a Stellar expression.
+   *
+   * <p>This is intended for use while unit testing Stellar expressions.  This ensures that the expression
+   * validates successfully and produces a result that can be serialized correctly.
+   *
+   * @param expression The expression to execute.
+   * @param variables The variables to expose to the expression.
+   * @param context The execution context.
+   * @return The result of executing the expression.
+   */
+  public static Object run(String expression, Map<String, Object> variables, Context context) {
+
+    // validate the expression
+    StellarProcessor processor = new StellarProcessor();
+    Assert.assertTrue("Invalid expression; expr=" + expression,
+            processor.validate(expression, context));
+
+    // execute the expression
+    Object ret = processor.parse(
+            expression,
+            new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)),
+            StellarFunctions.FUNCTION_RESOLVER(),
+            context);
+
+    // ensure the result can be serialized/deserialized
+    byte[] raw = SerDeUtils.toBytes(ret);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    Assert.assertEquals(ret, actual);
+
+    return ret;
+  }
+
+  /**
+   * Execute and validate a Stellar expression.
+   *
+   * <p>This is intended for use while unit testing Stellar expressions.  This ensures that the expression
+   * validates successfully and produces a result that can be serialized correctly.
+   *
+   * @param expression The expression to execute.
+   * @param variables The variables to expose to the expression.
+   * @return The result of executing the expression.
+   */
+  public static Object run(String expression, Map<String, Object> variables) {
+    return run(expression, variables, Context.EMPTY_CONTEXT());
+  }
 
-  public static Object run(String rule, Map<String, Object> variables) {
-    return run(rule, variables, Context.EMPTY_CONTEXT());
+  /**
+   * Execute and validate a Stellar expression.
+   *
+   * <p>This is intended for use while unit testing Stellar expressions.  This ensures that the expression
+   * validates successfully and produces a result that can be serialized correctly.
+   *
+   * @param expression The expression to execute.
+   * @param context The execution context.
+   * @return The result of executing the expression.
+   */
+  public static Object run(String expression, Context context) {
+    return run(expression, Collections.emptyMap(), context);
   }
 
-  public static void validate(String rule, Context context) {
+  public static void validate(String expression, Context context) {
     StellarProcessor processor = new StellarProcessor();
-    Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
+    Assert.assertTrue("Invalid expression; expr=" + expression,
+            processor.validate(expression, context));
   }
 
   public static void validate(String rule) {
@@ -101,19 +139,18 @@ public class StellarProcessorUtils {
   }
 
   public static void runWithArguments(String function, List<Object> arguments, Object expected) {
-    Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport.stream(new XRange(arguments.size()), false)
-            .map( i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i)));
+    Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport
+            .stream(new XRange(arguments.size()), false)
+            .map(i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i)));
 
-    String args = kvStream.get().map( kv -> kv.getKey())
-                                .collect(Collectors.joining(","));
+    String args = kvStream.get().map(kv -> kv.getKey()).collect(Collectors.joining(","));
     Map<String, Object> variables = kvStream.get().collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue()));
-    String stellarStatement =  function + "(" + args + ")";
+    String stellarStatement = function + "(" + args + ")";
     String reason = stellarStatement + " != " + expected + " with variables: " + variables;
 
-    if(expected instanceof Double) {
-      Assert.assertEquals(reason, (Double)expected, (Double)run(stellarStatement, variables), 1e-6);
-    }
-    else {
+    if (expected instanceof Double) {
+      Assert.assertEquals(reason, (Double) expected, (Double) run(stellarStatement, variables), 1e-6);
+    } else {
       Assert.assertEquals(reason, expected, run(stellarStatement, variables));
     }
   }
@@ -135,10 +172,9 @@ public class StellarProcessorUtils {
     @Override
     public boolean tryAdvance(IntConsumer action) {
       boolean isDone = i >= end;
-      if(isDone) {
+      if (isDone) {
         return false;
-      }
-      else {
+      } else {
         action.accept(i);
         i++;
         return true;
@@ -148,25 +184,20 @@ public class StellarProcessorUtils {
     /**
      * {@inheritDoc}
      *
-     * @param action
-     * to {@code IntConsumer} and passed to
-     * {@link #tryAdvance(IntConsumer)}; otherwise
-     * the action is adapted to an instance of {@code IntConsumer}, by
-     * boxing the argument of {@code IntConsumer}, and then passed to
-     * {@link #tryAdvance(IntConsumer)}.
+     * @param action to {@code IntConsumer} and passed to {@link #tryAdvance(IntConsumer)};
+     *     otherwise the action is adapted to an instance of {@code IntConsumer}, by boxing the
+     *     argument of {@code IntConsumer}, and then passed to {@link #tryAdvance(IntConsumer)}.
      */
     @Override
     public boolean tryAdvance(Consumer<? super Integer> action) {
       boolean isDone = i >= end;
-      if(isDone) {
+      if (isDone) {
         return false;
-      }
-      else {
+      } else {
         action.accept(i);
         i++;
         return true;
       }
     }
   }
-
 }


Mime
View raw message