metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [2/6] metron git commit: METRON-1136 Track Master in Feature Branch (ottobackwards) closes apache/metron#752
Date Wed, 13 Sep 2017 02:58:43 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
new file mode 100644
index 0000000..0ede0f8
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/config/ValueSupplier.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.topology.config;
+
+import org.apache.metron.common.configuration.SensorParserConfig;
+
+
+/**
+ * Supplies a value given a sensor config.
+ * @param <T>
+ */
+public interface ValueSupplier<T> {
+  T get(SensorParserConfig config, Class<T> clazz);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 3e8e2db..63d9e52 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -87,23 +87,29 @@ public class ParserTopologyComponent implements InMemoryComponent {
   @Override
   public void start() throws UnableToStartException {
     try {
-      TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
+      final Map<String, Object> stormConf = new HashMap<>();
+      stormConf.put(Config.TOPOLOGY_DEBUG, true);
+      ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
                                                                    , Optional.ofNullable(brokerUrl)
                                                                    , sensorType
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , 1
-                                                                   , null
-                                                                   , Optional.empty()
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> 1
+                                                                   , (x,y) -> new HashMap<>()
+                                                                   , (x,y) -> null
                                                                    , Optional.ofNullable(outputTopic)
+                                                                   , (x,y) -> {
+                                                                      Config c = new Config();
+                                                                      c.putAll(stormConf);
+                                                                      return c;
+                                                                      }
                                                                    );
-      Map<String, Object> stormConf = new HashMap<>();
-      stormConf.put(Config.TOPOLOGY_DEBUG, true);
+
       stormCluster = new LocalCluster();
-      stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.createTopology());
+      stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());
     } catch (Exception e) {
       throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 5f536a5..97dac5a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -18,8 +18,12 @@
 
 package org.apache.metron.parsers.topology;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.cli.Parser;
 import org.apache.log4j.Level;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.Config;
 import com.google.common.collect.ImmutableMap;
@@ -34,7 +38,10 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.ref.Reference;
 import java.util.*;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 
 public class ParserTopologyCLITest {
 
@@ -142,7 +149,8 @@ public class ParserTopologyCLITest {
                                      .with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3")
                                      .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
                                      .build(longOpt);
-    Config config = ParserTopologyCLI.ParserOptions.getConfig(cli);
+    Optional<Config> configOptional = ParserTopologyCLI.ParserOptions.getConfig(cli);
+    Config config = configOptional.get();
     Assert.assertEquals(1, config.get(Config.TOPOLOGY_WORKERS));
     Assert.assertEquals(2, config.get(Config.TOPOLOGY_ACKER_EXECUTORS));
     Assert.assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
@@ -189,7 +197,8 @@ public class ParserTopologyCLITest {
               .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
               .with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath())
               .build(longOpt);
-      Config config = ParserTopologyCLI.ParserOptions.getConfig(cli);
+      Optional<Config> configOptional = ParserTopologyCLI.ParserOptions.getConfig(cli);
+      Config config = configOptional.get();
       Assert.assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
       Assert.assertEquals("foo", config.get("string"));
       Assert.assertEquals(1, config.get("integer"));
@@ -197,4 +206,416 @@ public class ParserTopologyCLITest {
       extraFile.deleteOnExit();
     }
   }
+
+  private static class ParserInput {
+    private Integer spoutParallelism;
+    private Integer spoutNumTasks;
+    private Integer parserParallelism;
+    private Integer parserNumTasks;
+    private Integer errorParallelism;
+    private Integer errorNumTasks;
+    private Map<String, Object> spoutConfig;
+    private String securityProtocol;
+    private Config stormConf;
+
+    public ParserInput( ValueSupplier<Integer> spoutParallelism
+                      , ValueSupplier<Integer> spoutNumTasks
+                      , ValueSupplier<Integer> parserParallelism
+                      , ValueSupplier<Integer> parserNumTasks
+                      , ValueSupplier<Integer> errorParallelism
+                      , ValueSupplier<Integer> errorNumTasks
+                      , ValueSupplier<Map> spoutConfig
+                      , ValueSupplier<String> securityProtocol
+                      , ValueSupplier<Config> stormConf
+                      , SensorParserConfig config
+                      )
+    {
+      this.spoutParallelism = spoutParallelism.get(config, Integer.class);
+      this.spoutNumTasks = spoutNumTasks.get(config, Integer.class);
+      this.parserParallelism = parserParallelism.get(config, Integer.class);
+      this.parserNumTasks = parserNumTasks.get(config, Integer.class);
+      this.errorParallelism = errorParallelism.get(config, Integer.class);
+      this.errorNumTasks = errorNumTasks.get(config, Integer.class);
+      this.spoutConfig = spoutConfig.get(config, Map.class);
+      this.securityProtocol = securityProtocol.get(config, String.class);
+      this.stormConf = stormConf.get(config, Config.class);
+    }
+
+    public Integer getSpoutParallelism() {
+      return spoutParallelism;
+    }
+
+    public Integer getSpoutNumTasks() {
+      return spoutNumTasks;
+    }
+
+    public Integer getParserParallelism() {
+      return parserParallelism;
+    }
+
+    public Integer getParserNumTasks() {
+      return parserNumTasks;
+    }
+
+    public Integer getErrorParallelism() {
+      return errorParallelism;
+    }
+
+    public Integer getErrorNumTasks() {
+      return errorNumTasks;
+    }
+
+    public Map<String, Object> getSpoutConfig() {
+      return spoutConfig;
+    }
+
+    public String getSecurityProtocol() {
+      return securityProtocol;
+    }
+
+    public Config getStormConf() {
+      return stormConf;
+    }
+  }
+  /**
+{
+  "parserClassName": "org.apache.metron.parsers.GrokParser",
+  "sensorTopic": "squid",
+  "parserConfig": {
+    "grokPath": "/patterns/squid",
+    "patternLabel": "SQUID_DELIMITED",
+    "timestampField": "timestamp"
+  },
+  "fieldTransformations" : [
+    {
+      "transformation" : "STELLAR"
+    ,"output" : [ "full_hostname", "domain_without_subdomains" ]
+    ,"config" : {
+      "full_hostname" : "URL_TO_HOST(url)"
+      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
+                }
+    }
+                           ]
+}
+   */
+  @Multiline
+  public static String baseConfig;
+  private static SensorParserConfig getBaseConfig() {
+    try {
+      return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class);
+    } catch (IOException e) {
+      throw new IllegalStateException(e.getMessage(), e);
+    }
+  }
+
+  @Test
+  public void testSpoutParallelism() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
+                    , "10"
+                    , input -> input.getSpoutParallelism().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setSpoutParallelism(20);
+                      return config;
+                    }
+                    , input -> input.getSpoutParallelism().equals(20)
+                    );
+  }
+
+  @Test
+  public void testSpoutNumTasks() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
+                    , "10"
+                    , input -> input.getSpoutNumTasks().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setSpoutNumTasks(20);
+                      return config;
+                    }
+                    , input -> input.getSpoutNumTasks().equals(20)
+                    );
+  }
+
+  @Test
+  public void testParserParallelism() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
+                    , "10"
+                    , input -> input.getParserParallelism().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setParserParallelism(20);
+                      return config;
+                    }
+                    , input -> input.getParserParallelism().equals(20)
+                    );
+  }
+
+  @Test
+  public void testParserNumTasks() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
+                    , "10"
+                    , input -> input.getParserNumTasks().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setParserNumTasks(20);
+                      return config;
+                    }
+                    , input -> input.getParserNumTasks().equals(20)
+                    );
+  }
+
+  @Test
+  public void testErrorParallelism() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM
+                    , "10"
+                    , input -> input.getErrorParallelism().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setErrorWriterParallelism(20);
+                      return config;
+                    }
+                    , input -> input.getErrorParallelism().equals(20)
+                    );
+  }
+
+  @Test
+  public void testErrorNumTasks() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_NUM_TASKS
+                    , "10"
+                    , input -> input.getErrorNumTasks().equals(10)
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setErrorWriterNumTasks(20);
+                      return config;
+                    }
+                    , input -> input.getErrorNumTasks().equals(20)
+                    );
+  }
+
+  @Test
+  public void testSecurityProtocol_fromCLI() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+                    , "PLAINTEXT"
+                    , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+                    , () -> {
+                      SensorParserConfig config = getBaseConfig();
+                      config.setSecurityProtocol("KERBEROS");
+                      return config;
+                    }
+                    , input -> input.getSecurityProtocol().equals("KERBEROS")
+                    );
+  }
+
+  @Test
+  public void testSecurityProtocol_fromSpout() throws Exception {
+    //Ultimately the order of precedence is CLI > spout config > parser config
+    File extraConfig = File.createTempFile("spoutConfig", "json");
+      extraConfig.deleteOnExit();
+      writeMap(extraConfig, new HashMap<String, Object>() {{
+        put("security.protocol", "PLAINTEXTSASL");
+      }});
+    {
+      //Ensure that the CLI spout config takes precedence
+
+      testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{
+                         put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
+                         put(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL, "PLAINTEXT");
+                       }}
+              , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+              , () -> {
+                SensorParserConfig config = getBaseConfig();
+                config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
+                return config;
+              }
+              , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
+      );
+    }
+    {
+      //Ensure that the spout config takes precedence
+      testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{
+                         put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
+                       }}
+              , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL")
+              , () -> {
+                SensorParserConfig config = getBaseConfig();
+                config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
+                return config;
+              }
+              , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
+      );
+    }
+  }
+
+  @Test
+  public void testTopologyConfig_fromConfigExplicitly() throws Exception {
+    testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class)
+                    {{
+                      put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10");
+                      put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20");
+                    }}
+                    , input -> {
+                        Config c = input.getStormConf();
+                        return (int)c.get(Config.TOPOLOGY_WORKERS) == 10
+                            && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 20;
+                      }
+                      , () -> {
+                        SensorParserConfig config = getBaseConfig();
+                        config.setNumWorkers(100);
+                        config.setNumAckers(200);
+                        return config;
+                              }
+                      , input -> {
+                          Config c = input.getStormConf();
+                          return (int)c.get(Config.TOPOLOGY_WORKERS) == 100
+                              && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 200
+                                  ;
+                                 }
+                    );
+  }
+
+  @Test
+  public void testTopologyConfig() throws Exception {
+    File extraConfig = File.createTempFile("topologyConfig", "json");
+    extraConfig.deleteOnExit();
+    writeMap(extraConfig, new HashMap<String, Object>() {{
+      put(Config.TOPOLOGY_DEBUG, true);
+    }});
+    testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class)
+                    {{
+                      put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10");
+                      put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20");
+                      put(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraConfig.getAbsolutePath());
+                    }}
+                    , input -> {
+                        Config c = input.getStormConf();
+                        return (int)c.get(Config.TOPOLOGY_WORKERS) == 10
+                            && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 20
+                            && (boolean)c.get(Config.TOPOLOGY_DEBUG);
+                      }
+                      , () -> {
+                        SensorParserConfig config = getBaseConfig();
+                        config.setStormConfig(
+                          new HashMap<String, Object>() {{
+                            put(Config.TOPOLOGY_WORKERS, 100);
+                            put(Config.TOPOLOGY_ACKER_EXECUTORS, 200);
+                          }}
+                                             );
+                        return config;
+                              }
+                      , input -> {
+                          Config c = input.getStormConf();
+                          return (int)c.get(Config.TOPOLOGY_WORKERS) == 100
+                              && (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 200
+                              && !c.containsKey(Config.TOPOLOGY_DEBUG);
+                                 }
+                    );
+  }
+
+  @Test
+  public void testSpoutConfig() throws Exception {
+    File extraConfig = File.createTempFile("spoutConfig", "json");
+    extraConfig.deleteOnExit();
+    writeMap(extraConfig, new HashMap<String, Object>() {{
+      put("extra_config", "from_file");
+    }});
+    EnumMap<ParserTopologyCLI.ParserOptions, String> cliOptions = new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class)
+                    {{
+                      put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
+                    }};
+    Predicate<ParserInput> cliOverrideExpected = input -> {
+      return input.getSpoutConfig().get("extra_config").equals("from_file");
+    };
+
+    Predicate<ParserInput> configOverrideExpected = input -> {
+      return input.getSpoutConfig().get("extra_config").equals("from_zk")
+                                  ;
+    };
+
+    Supplier<SensorParserConfig> configSupplier = () -> {
+      SensorParserConfig config = getBaseConfig();
+      config.setSpoutConfig(
+              new HashMap<String, Object>() {{
+                put("extra_config", "from_zk");
+              }}
+      );
+      return config;
+    };
+    testConfigOption( cliOptions
+                    , cliOverrideExpected
+                    , configSupplier
+                    , configOverrideExpected
+                    );
+  }
+
+  private void writeMap(File outFile, Map<String, Object> config) throws IOException {
+    FileUtils.write(outFile, JSONUtils.INSTANCE.toJSON(config, true));
+  }
+
+  private void testConfigOption( ParserTopologyCLI.ParserOptions option
+                               , String cliOverride
+                               , Predicate<ParserInput> cliOverrideCondition
+                               , Supplier<SensorParserConfig> configSupplier
+                               , Predicate<ParserInput> configOverrideCondition
+  ) throws Exception {
+    testConfigOption(
+            new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{
+              put(option, cliOverride);
+            }},
+            cliOverrideCondition,
+            configSupplier,
+            configOverrideCondition
+    );
+  }
+
+  private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options
+                               , Predicate<ParserInput> cliOverrideCondition
+                               , Supplier<SensorParserConfig> configSupplier
+                               , Predicate<ParserInput> configOverrideCondition
+  ) throws Exception {
+    //CLI Override
+    SensorParserConfig config = configSupplier.get();
+    {
+      CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+              .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+      for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) {
+        builder.with(entry.getKey(), entry.getValue());
+      }
+      CommandLine cmd = builder.build(true);
+      ParserInput input = getInput(cmd, config);
+      Assert.assertTrue(cliOverrideCondition.test(input));
+    }
+    // Config Override
+    {
+      CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
+              .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+      CommandLine cmd = builder.build(true);
+      ParserInput input = getInput(cmd, config);
+      Assert.assertTrue(configOverrideCondition.test(input));
+    }
+  }
+
+  private static ParserInput getInput(CommandLine cmd, SensorParserConfig config ) throws Exception {
+    final ParserInput[] parserInput = new ParserInput[]{null};
+    new ParserTopologyCLI() {
+      @Override
+      protected ParserTopologyBuilder.ParserTopology getParserTopology(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, Optional<String> outputTopic) throws Exception {
+       parserInput[0] = new ParserInput( spoutParallelism
+                                        , spoutNumTasks
+                                        , parserParallelism
+                                        , parserNumTasks
+                                        , errorParallelism
+                                        , errorNumTasks
+                                        , spoutConfig
+                                        , securityProtocol
+                                        , stormConf
+                                        , config
+                                        );
+        return null;
+      }
+    }.createParserTopology(cmd);
+    return parserInput[0];
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-pcap-backend/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index 388b1e0..5878873 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -221,6 +221,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml
index be1fe33..97132c4 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -232,6 +232,16 @@
                         <configuration>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters> 
                             <artifactSet>
                                 <excludes>
                                     <exclude>storm:storm-core:*</exclude>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
index 7d3152c..de6b3b8 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -238,6 +238,16 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.google.common</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index fbb5561..b841249 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -64,6 +64,7 @@ public class SourceHandler {
     this.rotationPolicy = rotationPolicy;
     this.syncPolicy = syncPolicy;
     this.fileNameFormat = fileNameFormat;
+    this.cleanupCallback = cleanupCallback;
     initialize();
   }
 
@@ -71,12 +72,26 @@ public class SourceHandler {
   protected void handle(JSONObject message, String sensor, WriterConfiguration config, SyncPolicyCreator syncPolicyCreator) throws IOException {
     byte[] bytes = (message.toJSONString() + "\n").getBytes();
     synchronized (this.writeLock) {
-      out.write(bytes);
+      try {
+        out.write(bytes);
+      } catch (IOException writeException) {
+        LOG.warn("IOException while writing output", writeException);
+        // If the stream is closed, attempt to rotate the file and try again, hoping it's transient
+        if (writeException.getMessage().contains("Stream Closed")) {
+          LOG.warn("Output Stream was closed. Attempting to rotate file and continue");
+          rotateOutputFile();
+          // If this write fails, the exception will be allowed to bubble up.
+          out.write(bytes);
+        } else {
+          throw writeException;
+        }
+      }
       this.offset += bytes.length;
 
       if (this.syncPolicy.mark(null, this.offset)) {
         if (this.out instanceof HdfsDataOutputStream) {
-          ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+          ((HdfsDataOutputStream) this.out)
+              .hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
         } else {
           this.out.hsync();
         }
@@ -146,7 +161,7 @@ public class SourceHandler {
     return path;
   }
 
-  private void closeOutputFile() throws IOException {
+  protected void closeOutputFile() throws IOException {
     this.out.close();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 0118a15..832f8bf 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -18,11 +18,21 @@
 
 package org.apache.metron.writer.hdfs;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
@@ -32,11 +42,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.*;
-
 // Suppress ConstantConditions to avoid NPE warnings that only would occur on test failure anyway
 @SuppressWarnings("ConstantConditions")
 public class HdfsWriterTest {
@@ -410,6 +415,82 @@ public class HdfsWriterTest {
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testSingleFileIfNoStreamClosed() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    CountSyncPolicy basePolicy = new CountSyncPolicy(5);
+    ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.close();
+
+    File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/");
+
+    // The message should show up twice, once in each file
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+    expected.add(message.toJSONString());
+
+    // Assert both messages are in the same file, because the stream stayed open
+    Assert.assertEquals(1, outputFolder.listFiles().length);
+    for (File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      // One line per file
+      Assert.assertEquals(2, lines.size());
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testHandleAttemptsRotateIfStreamClosed() throws Exception {
+    String function = "FORMAT('test-%s/%s', test.key, test.key)";
+    WriterConfiguration config = buildWriterConfiguration(function);
+    HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
+    writer.init(new HashMap<String, String>(), createTopologyContext(), config);
+
+    JSONObject message = new JSONObject();
+    message.put("test.key", "test.value");
+    ArrayList<JSONObject> messages = new ArrayList<>();
+    messages.add(message);
+    ArrayList<Tuple> tuples = new ArrayList<>();
+
+    CountSyncPolicy basePolicy = new CountSyncPolicy(5);
+    ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
+
+    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", config).closeOutputFile();
+    writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value", config).handle(message, SENSOR_NAME, config, creator);
+    writer.close();
+
+    File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/");
+
+    // The message should show up twice, once in each file
+    ArrayList<String> expected = new ArrayList<>();
+    expected.add(message.toJSONString());
+
+    // Assert this went into a new file because it actually rotated
+    Assert.assertEquals(2, outputFolder.listFiles().length);
+    for (File file : outputFolder.listFiles()) {
+      List<String> lines = Files.readAllLines(file.toPath());
+      // One line per file
+      Assert.assertEquals(1, lines.size());
+      Assert.assertEquals(expected, lines);
+    }
+  }
+
   protected WriterConfiguration buildWriterConfiguration(String function) {
     IndexingConfigurations indexingConfig = new IndexingConfigurations();
     Map<String, Object> sensorIndexingConfig = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-stellar/stellar-common/README.md
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/README.md b/metron-stellar/stellar-common/README.md
index 926132e..d464d8c 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -1,24 +1,24 @@
 
+# Stellar Language
 
-# Contents
+For a variety of components (threat intelligence triage and field transformations) we have the need to do simple computation and transformation using the data from messages as variables.  For those purposes, there exists a simple, scaled down DSL created to do simple computation and transformation.
 
-* [Stellar Language](#stellar-language)
-    * [Stellar Language Keywords](#stellar-language-keywords)
-    * [Stellar Core Functions](#stellar-core-functions)
-    * [Stellar Benchmarks](#stellar-benchmarks)
-    * [Stellar Shell](#stellar-shell)
-* [Stellar Configuration](#stellar-configuration)
 
+* [Introduction](#introduction)
+* [Stellar Core Functions](#stellar-core-functions)
+* [Stellar Benchmarks](#stellar-benchmarks)
+* [Stellar Shell](#stellar-shell)
+    * [Getting Started](#getting-started)
+    * [Command Line Options](#command-line-options)
+    * [Variable Assignment](#variable-assignment)
+    * [Magic Commands](#magic-commands)
+    * [Advanced Usage](#advanced-usage)
+* [Stellar Configuration](#stellar-configuration)
 
-# Stellar Language
 
-For a variety of components (threat intelligence triage and field
-transformations) we have the need to do simple computation and
-transformation using the data from messages as variables.  
-For those purposes, there exists a simple, scaled down DSL 
-created to do simple computation and transformation.
+## Introduction
 
-The query language supports the following:
+The Stellar language supports the following:
 * Referencing fields in the enriched JSON
 * String literals are quoted with either `'` or `"`
 * String literals support escaping for `'`, `"`, `\t`, `\r`, `\n`, and backslash 
@@ -35,7 +35,7 @@ The query language supports the following:
 * The ability to have parenthesis to make order of operations explicit
 * User defined functions, including Lambda expressions 
 
-## Stellar Language Keywords
+### Stellar Language Keywords
 The following keywords need to be single quote escaped in order to be used in Stellar expressions:
 
 |               |               |             |             |             |
@@ -47,13 +47,13 @@ The following keywords need to be single quote escaped in order to be used in St
 
 Using parens such as: "foo" : "\<ok\>" requires escaping; "foo": "\'\<ok\>\'"
 
-## Stellar Language Inclusion Checks (`in` and `not in`)
+### Stellar Language Inclusion Checks (`in` and `not in`)
 1. `in` supports string contains. e.g. `'foo' in 'foobar' == true`
 2. `in` supports collection contains. e.g. `'foo' in [ 'foo', 'bar' ] == true`
 3. `in` supports map key contains. e.g. `'foo' in { 'foo' : 5} == true`
 4. `not in` is the negation of the in expression. e.g. `'grok' not in 'foobar' == true`
 
-## Stellar Language Comparisons (`<`, `<=`, `>`, `>=`)
+### Stellar Language Comparisons (`<`, `<=`, `>`, `>=`)
 
 1. If either side of the comparison is null then return false.
 2. If both values being compared implement number then the following:
@@ -64,7 +64,7 @@ Using parens such as: "foo" : "\<ok\>" requires escaping; "foo": "\'\<ok\>\'"
 3. If both sides are of the same type and are comparable then use the compareTo method to compare values.
 4. If none of the above are met then an exception is thrown.
 
-## Stellar Language Equality Check (`==`, `!=`)
+### Stellar Language Equality Check (`==`, `!=`)
 
 Below is how the `==` operator is expected to work:
 
@@ -78,7 +78,7 @@ Below is how the `==` operator is expected to work:
 
 The `!=` operator is the negation of the above.
 
-## Stellar Language Lambda Expressions
+### Stellar Language Lambda Expressions
 
 Stellar provides the capability to pass lambda expressions to functions
 which wish to support that layer of indirection.  The syntax is:
@@ -110,8 +110,8 @@ In the core language functions, we support basic functional programming primitiv
 | [ `BLOOM_EXISTS`](#bloom_exists)                                                                   |
 | [ `BLOOM_INIT`](#bloom_init)                                                                       |
 | [ `BLOOM_MERGE`](#bloom_merge)                                                                     |
-| [ `CEILING`](#ceiling)                                                           |
-| [ `COS`](#cos)                                                                   |
+| [ `CEILING`](#ceiling)                                                                             |
+| [ `COS`](#cos)                                                                                     |
 | [ `CHOP`](#chop)                                                                                   |
 | [ `CHOMP`](#chomp)                                                                                 |
 | [ `COUNT_MATCHES`](#count_matches)                                                                 |
@@ -126,20 +126,26 @@ In the core language functions, we support basic functional programming primitiv
 | [ `ENDS_WITH`](#ends_with)                                                                         |
 | [ `ENRICHMENT_EXISTS`](#enrichment_exists)                                                         |
 | [ `ENRICHMENT_GET`](#enrichment_get)                                                               |
-| [ `EXP`](#exp)                                                                   |
+| [ `EXP`](#exp)                                                                                     |
 | [ `FILL_LEFT`](#fill_left)                                                                         |
 | [ `FILL_RIGHT`](#fill_right)                                                                       |
 | [ `FILTER`](#filter)                                                                               |
-| [ `FLOOR`](#floor)                                                               |
-| [ `FUZZY_LANGS`](#fuzzy_langs)                                                   |
-| [ `FUZZY_SCORE`](#fuzzy_score)                                                   |
+| [ `FLOOR`](#floor)                                                                                 |
+| [ `FUZZY_LANGS`](#fuzzy_langs)                                                                     |
+| [ `FUZZY_SCORE`](#fuzzy_score)                                                                     |
 | [ `FORMAT`](#format)                                                                               |
 | [ `GEO_GET`](#geo_get)                                                                             |
+| [ `GEOHASH_CENTROID`](#geohash_centroid)                                                           |
+| [ `GEOHASH_DIST`](#geohash_dist)                                                                   |
+| [ `GEOHASH_FROM_LATLONG`](#geohash_from_latlong)                                                   |
+| [ `GEOHASH_FROM_LOC`](#geohash_from_loc)                                                           |
+| [ `GEOHASH_MAX_DIST`](#geohash_max_dist)                                                           |
+| [ `GEOHASH_TO_LATLONG`](#geohash_to_latlong)                                                       |
 | [ `GET`](#get)                                                                                     |
 | [ `GET_FIRST`](#get_first)                                                                         |
 | [ `GET_LAST`](#get_last)                                                                           |
-| [ `GET_SUPPORTED_ENCODINGS`](#get_supported_encodings)                                           |
-| [ `HASH`](#hash)                                                                           |
+| [ `GET_SUPPORTED_ENCODINGS`](#get_supported_encodings)                                             |
+| [ `HASH`](#hash)                                                                                   |
 | [ `HLLP_CARDINALITY`](../../metron-analytics/metron-statistics#hllp_cardinality)                   |
 | [ `HLLP_INIT`](../../metron-analytics/metron-statistics#hllp_init)                                 |
 | [ `HLLP_MERGE`](../../metron-analytics/metron-statistics#hllp_merge)                               |
@@ -160,14 +166,19 @@ In the core language functions, we support basic functional programming primitiv
 | [ `KAFKA_TAIL`](#kafka_tail)                                                                       |
 | [ `LENGTH`](#length)                                                                               |
 | [ `LIST_ADD`](#list_add)                                                                           |
-| [ `LOG2`](#log2)                                                                               |
-| [ `LOG10`](#log10)                                                                               |
-| [ `LN`](#ln)                                                                               |
+| [ `LOG2`](#log2)                                                                                   |
+| [ `LOG10`](#log10)                                                                                 |
+| [ `LN`](#ln)                                                                                       |
 | [ `MAAS_GET_ENDPOINT`](#maas_get_endpoint)                                                         |
 | [ `MAAS_MODEL_APPLY`](#maas_model_apply)                                                           |
 | [ `MAP`](#map)                                                                                     |
 | [ `MAP_EXISTS`](#map_exists)                                                                       |
 | [ `MONTH`](#month)                                                                                 |
+| [ `MULTISET_ADD`](#multiset_add)                                                                   |
+| [ `MULTISET_INIT`](#multiset_init)                                                                 |
+| [ `MULTISET_MERGE`](#multiset_merge)                                                               |
+| [ `MULTISET_REMOVE`](#multiset_remove)                                                             |
+| [ `MULTISET_TO_SET`](#multiset_to_set)                                                             |
 | [ `PREPEND_IF_MISSING`](#prepend_if_missing)                                                       |
 | [ `PROFILE_GET`](#profile_get)                                                                     |
 | [ `PROFILE_FIXED`](#profile_fixed)                                                                 |
@@ -175,11 +186,15 @@ In the core language functions, we support basic functional programming primitiv
 | [ `PROTOCOL_TO_NAME`](#protocol_to_name)                                                           |
 | [ `REDUCE`](#reduce)                                                                               |
 | [ `REGEXP_MATCH`](#regexp_match)                                                                   |
-| [ `REGEXP_GROUP_VAL`](#regexp_group_val)                                                                   |
-| [ `ROUND`](#round)                                                                   |
+| [ `REGEXP_GROUP_VAL`](#regexp_group_val)                                                           |
+| [ `ROUND`](#round)                                                                                 |
+| [ `SET_ADD`](#set_add)                                                                             |
+| [ `SET_INIT`](#set_init)                                                                           |
+| [ `SET_MERGE`](#set_merge)                                                                         |
+| [ `SET_REMOVE`](#set_remove)                                                                       |
 | [ `SPLIT`](#split)                                                                                 |
-| [ `SIN`](#sin)                                                                                 |
-| [ `SQRT`](#sqrt)                                                                                 |
+| [ `SIN`](#sin)                                                                                     |
+| [ `SQRT`](#sqrt)                                                                                   |
 | [ `STARTS_WITH`](#starts_with)                                                                     |
 | [ `STATS_ADD`](../../metron-analytics/metron-statistics#stats_add)                                 |
 | [ `STATS_BIN`](../../metron-analytics/metron-statistics#stats_bin)                                 |
@@ -201,13 +216,17 @@ In the core language functions, we support basic functional programming primitiv
 | [ `STATS_SUM_SQUARES`](../../metron-analytics/metron-statistics#stats_sum_squares)                 |
 | [ `STATS_VARIANCE`](../../metron-analytics/metron-statistics#stats_variance)                       |
 | [ `STRING_ENTROPY`](#string_entropy)                                                               |
+| [ `SUBSTRING`](#substring)                                                                         |
 | [ `SYSTEM_ENV_GET`](#system_env_get)                                                               |
 | [ `SYSTEM_PROPERTY_GET`](#system_property_get)                                                     |
-| [ `TAN`](#tan)                                                                         |
+| [ `TAN`](#tan)                                                                                     |
 | [ `TO_DOUBLE`](#to_double)                                                                         |
 | [ `TO_EPOCH_TIMESTAMP`](#to_epoch_timestamp)                                                       |
 | [ `TO_FLOAT`](#to_float)                                                                           |
 | [ `TO_INTEGER`](#to_integer)                                                                       |
+| [ `TO_JSON_LIST`](#to_json_List)                                                                   |
+| [ `TO_JSON_MAP`](#to_json_map)                                                                     |
+| [ `TO_JSON_OBJECT`](#to_json_object)                                                               |
 | [ `TO_LONG`](#to_long)                                                                             |
 | [ `TO_LOWER`](#to_lower)                                                                           |
 | [ `TO_STRING`](#to_string)                                                                         |
@@ -220,8 +239,8 @@ In the core language functions, we support basic functional programming primitiv
 | [ `WEEK_OF_MONTH`](#week_of_month)                                                                 |
 | [ `WEEK_OF_YEAR`](#week_of_year)                                                                   |
 | [ `YEAR`](#year)                                                                                   |
-| [ `ZIP`](#zip)                                                                                   |
-| [ `ZIP_JAGGED`](#zip_jagged)                                                                                   |
+| [ `ZIP`](#zip)                                                                                     |
+| [ `ZIP_JAGGED`](#zip_jagged)                                                                       |
 
 ### `APPEND_IF_MISSING`
   * Description: Appends the suffix to the end of the string if the string does not already end with any of the suffixes.
@@ -434,6 +453,50 @@ In the core language functions, we support basic functional programming primitiv
     * fields - Optional list of GeoIP fields to grab. Options are locID, country, city postalCode, dmaCode, latitude, longitude, location_point
   * Returns: If a Single field is requested a string of the field, If multiple fields a map of string of the fields, and null otherwise
 
+### `GEOHASH_CENTROID`
+  * Description: Compute the centroid (geographic midpoint or center of gravity) of a set of [geohashes](https://en.wikipedia.org/wiki/Geohash)
+  * Input:
+    * hashes - A collection of [geohashes](https://en.wikipedia.org/wiki/Geohash) or a map associating geohashes to numeric weights
+    * character_precision? - The number of characters to use in the hash. Default is 12
+  * Returns: The geohash of the centroid
+
+### `GEOHASH_DIST`
+  * Description: Compute the distance between [geohashes](https://en.wikipedia.org/wiki/Geohash)
+  * Input:
+    * hash1 - The first point as a geohash
+    * hash2 - The second point as a geohash
+    * strategy? - The great circle distance strategy to use.  One of [HAVERSINE](https://en.wikipedia.org/wiki/Haversine_formula), [LAW_OF_COSINES](https://en.wikipedia.org/wiki/Law_of_cosines#Using_the_distance_formula), or [VICENTY](https://en.wikipedia.org/wiki/Vincenty%27s_formulae).  Haversine is default.
+  * Returns: The distance in kilometers between the hashes.
+
+### `GEOHASH_FROM_LATLONG`
+  * Description: Compute [geohash](https://en.wikipedia.org/wiki/Geohash) given a lat/long
+  * Input:
+    * latitude - The latitude
+    * longitude - The longitude
+    * character_precision? - The number of characters to use in the hash. Default is 12
+  * Returns: A [geohash](https://en.wikipedia.org/wiki/Geohash) of the lat/long
+
+### `GEOHASH_FROM_LOC`
+  * Description: Compute [geohash](https://en.wikipedia.org/wiki/Geohash) given a geo enrichment location
+  * Input:
+    * map - the latitude and logitude in a map (the output of [GEO_GET](#geo_get) )
+    * longitude - The longitude
+    * character_precision? - The number of characters to use in the hash. Default is `12`
+  * Returns: A [geohash](https://en.wikipedia.org/wiki/Geohash) of the location
+
+### `GEOHASH_MAX_DIST`
+  * Description: Compute the maximum distance among a list of [geohashes](https://en.wikipedia.org/wiki/Geohash)
+  * Input:
+    * hashes - A set of [geohashes](https://en.wikipedia.org/wiki/Geohash)
+    * strategy? - The great circle distance strategy to use. One of [HAVERSINE](https://en.wikipedia.org/wiki/Haversine_formula), [LAW_OF_COSINES](https://en.wikipedia.org/wiki/Law_of_cosines#Using_the_distance_formula), or [VICENTY](https://en.wikipedia.org/wiki/Vincenty%27s_formulae).  Haversine is default.
+  * Returns: The maximum distance in kilometers between any two locations
+
+### `GEOHASH_TO_LATLONG`
+  * Description: Compute the lat/long of a given [geohash](https://en.wikipedia.org/wiki/Geohash)
+  * Input:
+    * hash - The [geohash](https://en.wikipedia.org/wiki/Geohash)
+  * Returns: A map containing the latitude and longitude of the hash (keys "latitude" and "longitude")
+
 ### `GET`
   * Description: Returns the i'th element of the list 
   * Input:
@@ -502,7 +565,7 @@ In the core language functions, we support basic functional programming primitiv
   * Description: Returns true if the passed string is encoded in one of the supported encodings and false if otherwise.
   * Input:
       * string - The string to test
-        * encoding - The name of the encoding as string.  See [ `GET_SUPPORTED_ENCODINGS`](#get_supported_encodings).
+      * encoding - The name of the encoding as string.  See [ `GET_SUPPORTED_ENCODINGS`](#get_supported_encodings).
   * Returns: True if the passed string is encoded in one of the supported encodings and false if otherwise.
 
 ### `IS_INTEGER`
@@ -578,6 +641,27 @@ In the core language functions, we support basic functional programming primitiv
   * Description: Returns a list of the encodings that are currently supported.
   * Returns: A List of String
  
+### `TO_JSON_LIST`
+  * Description: Accepts JSON string as an input and returns a List object parsed by Jackson. You need to be aware of content of JSON string that is to be parsed.
+  For e.g. `GET_FIRST( TO_JSON_LIST(  '[ "foo", 2]')` would yield `foo`
+  * Input:
+    * string - The JSON string to be parsed
+  * Returns: A parsed List object
+
+### `TO_JSON_MAP`
+  * Description: Accepts JSON string as an input and returns a Map object parsed by Jackson. You need to be aware of content of JSON string that is to be parsed.
+  For e.g. `MAP_GET( 'bar', TO_JSON_MAP(  '{ "foo" : 1, "bar" : 2}' )` would yield `2`
+  * Input:
+    * string - The JSON string to be parsed
+  * Returns: A parsed Map object
+
+### `TO_JSON_OBJECT`
+  * Description: Accepts JSON string as an input and returns a JSON Object parsed by Jackson. You need to be aware of content of JSON string that is to be parsed.
+  For e.g. `MAP_GET( 'bar', TO_JSON_OBJECT(  '{ "foo" : 1, "bar" : 2}' )` would yield `2`
+  * Input:
+    * string - The JSON string to be parsed
+  * Returns: A parsed JSON object
+
 ### `LOG2`
   * Description: Returns the log (base `2`) of a number.
   * Input:
@@ -638,6 +722,38 @@ In the core language functions, we support basic functional programming primitiv
   * Input:
     * dateTime - The datetime as a long representing the milliseconds since unix epoch
   * Returns: The current month (0-based).
+  
+### `MULTISET_ADD`
+  * Description: Adds to a multiset, which is a map associating objects to their instance counts.
+  * Input:
+    * set - The multiset to add to
+    * o - object to add to multiset
+  * Returns: A multiset
+
+### `MULTISET_INIT`
+  * Description: Creates an empty multiset, which is a map associating objects to their instance counts.
+  * Input:
+    * input? - An initialization of the multiset
+  * Returns: A multiset
+
+### `MULTISET_MERGE`
+  * Description: Merges a list of multisets, which is a map associating objects to their instance counts.
+  * Input:
+    * sets - A collection of multisets to merge
+  * Returns: A multiset
+
+### `MULTISET_REMOVE`
+  * Description: Removes from a multiset, which is a map associating objects to their instance counts.
+  * Input:
+    * set - The multiset to add to
+    * o - object to remove from multiset
+  * Returns: A multiset
+
+### `MULTISET_TO_SET`
+  * Description: Create a set out of a multiset, which is a map associating objects to their instance counts.
+  * Input:
+    * multiset - The multiset to convert.
+  * Returns: The set of objects in the multiset ignoring multiplicity
 
 ### `PREPEND_IF_MISSING`
   * Description: Prepends the prefix to the start of the string if the string does not already start with any of the prefixes.
@@ -708,6 +824,32 @@ In the core language functions, we support basic functional programming primitiv
     * number - The number to round
   * Returns: The nearest integer (based on half-up rounding).
 
+### `SET_ADD`
+  * Description: Adds to a set
+  * Input:
+    * set - The set to add to
+    * o - object to add to set
+  * Returns: A Set
+
+### `SET_INIT`
+  * Description: Creates an new set
+  * Input:
+    * input? - An initialization of the set
+  * Returns: A Set
+
+### `SET_MERGE`
+  * Description: Merges a list of sets
+  * Input:
+    * sets - A collection of sets to merge
+  * Returns: A Set
+
+### `SET_REMOVE`
+  * Description: Removes from a set
+  * Input:
+    * set - The set to add to
+    * o - object to add to set
+  * Returns: A Set
+
 ### `SIN`
   * Description: Returns the sine of a number.
   * Input:
@@ -752,6 +894,14 @@ In the core language functions, we support basic functional programming primitiv
     * key - Property to get the value for
   * Returns: String
 
+### `SUBSTRING`
+  * Description: Returns the substring of a string
+  * Input:
+    * input - The string to take the substring of
+    * start - The starting position (`0`-based and inclusive)
+    * end? - The ending position (`0`-based and exclusive)
+  * Returns: The substring of the input
+
 ### `TAN`
   * Description: Returns the tangent of a number.
   * Input:
@@ -1080,8 +1230,6 @@ IN_SUBNET
 Lists all variables in the Stellar environment.
 
 ```
-Stellar, Go!
-{es.clustername=metron, es.ip=node1, es.port=9300, es.date.format=yyyy.MM.dd.HH}
 [Stellar]>>> %vars
 [Stellar]>>> foo := 2 + 2
 4.0
@@ -1089,6 +1237,38 @@ Stellar, Go!
 foo = 4.0
 ```
 
+#### `%globals`
+
+Lists all values that are defined in the global configuration.
+
+Most of Metron's functional components have access to what is called the global configuration.  This is a key/value configuration store that can be used to customize Metron.  Many Stellar functions accept configuration values from the global configuration.  The Stellar Shell also leverages the global configuration for customizing the behavior of many Stellar functions.  
+
+```
+[Stellar]>>> %globals
+{es.clustername=metron, es.ip=node1:9300, es.date.format=yyyy.MM.dd.HH, parser.error.topic=indexing, update.hbase.table=metron_update, update.hbase.cf=t}
+```
+
+#### `%define`
+
+Defines a global configuration value in the current shell session.  This value will be forgotten once the session is ended.
+
+```
+[Stellar]>>> %define bootstrap.servers := "node1:6667"
+node1:6667
+[Stellar]>>> %globals
+{bootstrap.servers=node1:6667}
+``` 
+
+#### `%undefine`
+
+Undefine a global configuration value in the current shell session.  This will not modify the persisted global configuration.
+
+```
+[Stellar]>>> %undefine bootstrap.servers
+[Stellar]>>> %globals
+{}
+```
+
 #### `?<function>`
 
 Returns formatted documentation of the Stellar function.  Provides the description of the function along with the expected arguments.
@@ -1138,7 +1318,7 @@ Please note that functions are loading lazily in the background and will be unav
 ABS, APPEND_IF_MISSING, BIN, BLOOM_ADD, BLOOM_EXISTS, BLOOM_INIT, BLOOM_MERGE, CHOMP, CHOP, COUNT_MATCHES, DAY_OF_MONTH, DAY_OF_WEEK, DAY_OF_YEAR, DOMAIN_REMOVE_SUBDOMAINS, DOMAIN_REMOVE_TLD, DOMAIN_TO_TLD, ENDS_WITH, FILL_LEFT, FILL_RIGHT, FILTER, FORMAT, GET, GET_FIRST, GET_LAST, HLLP_ADD, HLLP_CARDINALITY, HLLP_INIT, HLLP_MERGE, IN_SUBNET, IS_DATE, IS_DOMAIN, IS_EMAIL, IS_EMPTY, IS_INTEGER, IS_IP, IS_URL, JOIN, LENGTH, LIST_ADD, MAAS_GET_ENDPOINT, MAAS_MODEL_APPLY, MAP, MAP_EXISTS, MAP_GET, MONTH, OUTLIER_MAD_ADD, OUTLIER_MAD_SCORE, OUTLIER_MAD_STATE_MERGE, PREPEND_IF_MISSING, PROFILE_FIXED, PROFILE_GET, PROFILE_WINDOW, PROTOCOL_TO_NAME, REDUCE, REGEXP_MATCH, SPLIT, STARTS_WITH, STATS_ADD, STATS_BIN, STATS_COUNT, STATS_GEOMETRIC_MEAN, STATS_INIT, STATS_KURTOSIS, STATS_MAX, STATS_MEAN, STATS_MERGE, STATS_MIN, STATS_PERCENTILE, STATS_POPULATION_VARIANCE, STATS_QUADRATIC_MEAN, STATS_SD, STATS_SKEWNESS, STATS_SUM, STATS_SUM_LOGS, STATS_SUM_SQUARES, STATS_VARIANCE, STRING_ENTROPY, SYS
 TEM_ENV_GET, SYSTEM_PROPERTY_GET, TO_DOUBLE, TO_EPOCH_TIMESTAMP, TO_FLOAT, TO_INTEGER, TO_LONG, TO_LOWER, TO_STRING, TO_UPPER, TRIM, URL_TO_HOST, URL_TO_PATH, URL_TO_PORT, URL_TO_PROTOCOL, WEEK_OF_MONTH, WEEK_OF_YEAR, YEAR 
 ```
 
-# Stellar Configuration
+## Stellar Configuration
 
 Stellar can be configured in a variety of ways from the [Global Configuration](../../metron-platform/metron-common/README.md#global-configuration).
 In particular, there are three main configuration parameters around configuring Stellar:

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-stellar/stellar-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml
index 5945bbd..9ec29b8 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -257,6 +257,16 @@
                         <configuration>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <shadedClassifierName>uber</shadedClassifierName>
+                            <filters>
+                              <filter>
+                                <artifact>*:*</artifact>
+                                <excludes>
+                                  <exclude>META-INF/*.SF</exclude>
+                                  <exclude>META-INF/*.DSA</exclude>
+                                  <exclude>META-INF/*.RSA</exclude>
+                                </excludes>
+                              </filter>
+                            </filters>
                             <relocations>
                                 <relocation>
                                     <pattern>com.fasterxml.jackson</pattern>

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/StellarCompiler.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/StellarCompiler.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/StellarCompiler.java
index a8bc773..b669bc7 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/StellarCompiler.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/StellarCompiler.java
@@ -121,15 +121,15 @@ public class StellarCompiler extends StellarBaseListener {
             //if we have a boolean as the current value and the next non-contextual token is a short circuit op
             //then we need to short circuit possibly
             if(token.getUnderlyingType() == BooleanArg.class) {
-              if (curr.getMultiArgContext() != null
-                      && curr.getMultiArgContext().getVariety() == FrameContext.BOOLEAN_OR
+              if (token.getMultiArgContext() != null
+                      && token.getMultiArgContext().getVariety() == FrameContext.BOOLEAN_OR
                       && (Boolean) (curr.getValue())
                       ) {
                 //short circuit the or
                 FrameContext.Context context = curr.getMultiArgContext();
                 shortCircuit(it, context);
-              } else if (curr.getMultiArgContext() != null
-                      && curr.getMultiArgContext().getVariety() == FrameContext.BOOLEAN_AND
+              } else if (token.getMultiArgContext() != null
+                      && token.getMultiArgContext().getVariety() == FrameContext.BOOLEAN_AND
                       && !(Boolean) (curr.getValue())
                       ) {
                 //short circuit the and

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
index 1181c6f..febde40 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarExecutor.java
@@ -92,9 +92,10 @@ public class StellarExecutor {
   }
 
   /**
-   * prefix tree index of autocompletes
+   * Prefix tree index of auto-completes.
    */
   private PatriciaTrie<AutoCompleteType> autocompleteIndex;
+
   /**
    * The variables known by Stellar.
    */
@@ -118,7 +119,9 @@ public class StellarExecutor {
   private Console console;
 
   public enum OperationType {
-    DOC,MAGIC,NORMAL;
+    DOC
+    , MAGIC
+    , NORMAL
   }
 
   public interface AutoCompleteTransformation {
@@ -138,6 +141,7 @@ public class StellarExecutor {
     , VARIABLE((type, key) -> key )
     , TOKEN((type, key) -> key)
     ;
+
     AutoCompleteTransformation transform;
     AutoCompleteType(AutoCompleteTransformation transform) {
       this.transform = transform;
@@ -147,7 +151,6 @@ public class StellarExecutor {
     public String transform(OperationType type, String key) {
       return transform.transform(type, key);
     }
-
   }
 
   /**
@@ -178,18 +181,17 @@ public class StellarExecutor {
 
     // asynchronously update the index with function names found from a classpath scan.
     new Thread( () -> {
-        Iterable<StellarFunctionInfo> functions = functionResolver.getFunctionInfo();
-        indexLock.writeLock().lock();
-        try {
-          for(StellarFunctionInfo info: functions) {
-            String functionName = info.getName();
-            autocompleteIndex.put(functionName, AutoCompleteType.FUNCTION);
-          }
+      Iterable<StellarFunctionInfo> functions = functionResolver.getFunctionInfo();
+      indexLock.writeLock().lock();
+      try {
+        for(StellarFunctionInfo info: functions) {
+          String functionName = info.getName();
+          autocompleteIndex.put(functionName, AutoCompleteType.FUNCTION);
         }
-          finally {
-            System.out.println("Functions loaded, you may refer to functions now...");
-            indexLock.writeLock().unlock();
-          }
+      } finally {
+        System.out.println("Functions loaded, you may refer to functions now...");
+        indexLock.writeLock().unlock();
+      }
     }).start();
   }
 
@@ -203,6 +205,9 @@ public class StellarExecutor {
     index.put("quit", AutoCompleteType.TOKEN);
     index.put(StellarShell.MAGIC_FUNCTIONS, AutoCompleteType.FUNCTION);
     index.put(StellarShell.MAGIC_VARS, AutoCompleteType.FUNCTION);
+    index.put(StellarShell.MAGIC_GLOBALS, AutoCompleteType.FUNCTION);
+    index.put(StellarShell.MAGIC_DEFINE, AutoCompleteType.FUNCTION);
+    index.put(StellarShell.MAGIC_UNDEFINE, AutoCompleteType.FUNCTION);
     return new PatriciaTrie<>(index);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
index a860778..0002c5a 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/StellarShell.java
@@ -30,10 +30,9 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.StellarFunctionInfo;
 import org.apache.metron.stellar.common.StellarAssignment;
 import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.StellarFunctionInfo;
 import org.jboss.aesh.complete.CompleteOperation;
 import org.jboss.aesh.complete.Completion;
 import org.jboss.aesh.console.AeshConsoleCallback;
@@ -51,13 +50,18 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
 /**
  * A REPL environment for Stellar.
  *
@@ -90,6 +94,9 @@ public class StellarShell extends AeshConsoleCallback implements Completion {
   public static final String MAGIC_VARS = MAGIC_PREFIX + "vars";
   public static final String DOC_PREFIX = "?";
   public static final String STELLAR_PROPERTIES_FILENAME = "stellar.properties";
+  public static final String MAGIC_GLOBALS = MAGIC_PREFIX + "globals";
+  public static final String MAGIC_DEFINE = MAGIC_PREFIX + "define";
+  public static final String MAGIC_UNDEFINE = MAGIC_PREFIX + "undefine";
 
   private StellarExecutor executor;
 
@@ -241,7 +248,7 @@ public class StellarShell extends AeshConsoleCallback implements Completion {
     // welcome message and print globals
     writeLine(WELCOME);
     executor.getContext()
-            .getCapability(Context.Capabilities.GLOBAL_CONFIG, false)
+            .getCapability(GLOBAL_CONFIG, false)
             .ifPresent(conf -> writeLine(conf.toString()));
 
     console.start();
@@ -287,32 +294,25 @@ public class StellarShell extends AeshConsoleCallback implements Completion {
    * Executes a magic expression.
    * @param rawExpression The expression to execute.
    */
-  private void handleMagic( String rawExpression) {
-    String[] expression = rawExpression.trim().split(" ");
+  private void handleMagic(String rawExpression) {
 
+    String[] expression = rawExpression.trim().split("\\s+");
     String command = expression[0];
-    if(MAGIC_FUNCTIONS.equals(command)) {
 
-      // if '%functions FOO' then show only functions that contain 'FOO'
-      Predicate<String> nameFilter = (name -> true);
-      if(expression.length > 1) {
-        nameFilter = (name -> name.contains(expression[1]));
-      }
+    if (MAGIC_FUNCTIONS.equals(command)) {
+      handleMagicFunctions(expression);
+
+    } else if (MAGIC_VARS.equals(command)) {
+      handleMagicVars();
 
-      // list available functions
-      String functions = StreamSupport
-              .stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false)
-              .map(info -> String.format("%s", info.getName()))
-              .filter(nameFilter)
-              .sorted()
-              .collect(Collectors.joining(", "));
-      writeLine(functions);
+    } else if (MAGIC_GLOBALS.equals(command)) {
+      handleMagicGlobals();
 
-    } else if(MAGIC_VARS.equals(command)) {
+    } else if (MAGIC_DEFINE.equals(command)) {
+      handleMagicDefine(rawExpression);
 
-      // list all variables
-      executor.getVariables()
-              .forEach((k,v) -> writeLine(String.format("%s = %s", k, v)));
+    } else if(MAGIC_UNDEFINE.equals(command)) {
+      handleMagicUndefine(expression);
 
     } else {
       writeLine(ERROR_PROMPT + "undefined magic command: " + rawExpression);
@@ -320,6 +320,106 @@ public class StellarShell extends AeshConsoleCallback implements Completion {
   }
 
   /**
+   * Handle a magic '%functions'.  Lists all of the variables in-scope.
+   * @param expression
+   */
+  private void handleMagicFunctions(String[] expression) {
+
+    // if '%functions FOO' then show only functions that contain 'FOO'
+    Predicate<String> nameFilter = (name -> true);
+    if (expression.length > 1) {
+      nameFilter = (name -> name.contains(expression[1]));
+    }
+
+    // '%functions' -> list all functions in scope
+    String functions = StreamSupport
+            .stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false)
+            .map(info -> String.format("%s", info.getName()))
+            .filter(nameFilter)
+            .sorted()
+            .collect(Collectors.joining(", "));
+    writeLine(functions);
+  }
+
+  /**
+   * Handle a magic '%vars'.  Lists all of the variables in-scope.
+   */
+  private void handleMagicVars() {
+    executor.getVariables()
+            .forEach((k, v) -> writeLine(String.format("%s = %s", k, v)));
+  }
+
+  /**
+   * Handle a magic '%globals'.  List all of the global configuration values.
+   */
+  private void handleMagicGlobals() {
+    Map<String, Object> globals = getOrCreateGlobalConfig(executor);
+    writeLine(globals.toString());
+  }
+
+  /**
+   * Handle a magic '%define var=value'.  Alter the global configuration.
+   * @param expression The expression passed to %define
+   */
+  public void handleMagicDefine(String expression) {
+
+    // grab the expression in '%define <assign-expression>'
+    String assignExpr = StringUtils.trimToEmpty(expression.substring(MAGIC_DEFINE.length()));
+    if (assignExpr.length() > 0) {
+
+      // the expression must be an assignment
+      if(StellarAssignment.isAssignment(assignExpr)) {
+        StellarAssignment expr = StellarAssignment.from(assignExpr);
+
+        // execute the expression
+        Object result = executor.execute(expr.getStatement());
+        if (result != null) {
+          writeLine(result.toString());
+
+          // alter the global configuration
+          getOrCreateGlobalConfig(executor).put(expr.getVariable(), result);
+        }
+
+      } else {
+        // the expression is not an assignment.  boo!
+        writeLine(ERROR_PROMPT + MAGIC_DEFINE + " expected assignment expression");
+      }
+    }
+  }
+
+  /**
+   * Handle a magic '%undefine var'.  Removes a variable from the global configuration.
+   * @param expression
+   */
+  private void handleMagicUndefine(String[] expression) {
+    if(expression.length > 1) {
+      Map<String, Object> globals = getOrCreateGlobalConfig(executor);
+      globals.remove(expression[1]);
+    }
+  }
+
+  /**
+   * Retrieves the GLOBAL_CONFIG, if it exists.  If it does not, it creates the GLOBAL_CONFIG
+   * and adds it to the Stellar execution context.
+   * @param executor The Stellar executor.
+   * @return The global configuration.
+   */
+  private Map<String, Object> getOrCreateGlobalConfig(StellarExecutor executor) {
+    Map<String, Object> globals;
+    Optional<Object> capability = executor.getContext().getCapability(GLOBAL_CONFIG, false);
+    if (capability.isPresent()) {
+      globals = (Map<String, Object>) capability.get();
+
+    } else {
+      // if it does not exist, create it.  this creates the global config for the current stellar executor
+      // session only.  this does not change the global config maintained externally in zookeeper
+      globals = new HashMap<>();
+      executor.getContext().addCapability(GLOBAL_CONFIG, () -> globals);
+    }
+    return globals;
+  }
+
+  /**
    * Executes a doc expression.
    * @param expression The doc expression to execute.
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/24b668b0/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DataStructureFunctions.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DataStructureFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DataStructureFunctions.java
index f3d995f..a92406b 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DataStructureFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/DataStructureFunctions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.stellar.dsl.functions;
 
+import com.google.common.collect.Iterables;
 import org.apache.metron.stellar.dsl.BaseStellarFunction;
 import org.apache.metron.stellar.dsl.Stellar;
 import org.apache.metron.stellar.common.utils.BloomFilter;
@@ -24,7 +25,9 @@ import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.common.utils.SerDeUtils;
 
 import java.util.Collection;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
 
 public class DataStructureFunctions {
 
@@ -217,4 +220,6 @@ public class DataStructureFunctions {
       }
     }
   }
+
+
 }


Mime
View raw message