metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [17/25] metron git commit: METRON-877 Extract core implementation and UDF support, create metron-stellar module (mattf-horton) closes apache/metron#616
Date Sun, 02 Jul 2017 22:43:43 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/PausableInput.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/PausableInput.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/PausableInput.java
deleted file mode 100644
index ee7ed0a..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/PausableInput.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.common.stellar.shell;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An input stream which mirrors System.in, but allows you to 'pause' and 'unpause' it.
- * The Aeshell has an external thread which is constantly polling System.in.  If you
- * need to spawn a program externally (i.e. an editor) which shares stdin, this thread
- * and the spawned program both share a buffer.  This causes contention and unpredictable
- * results (e.g. an input may be consumed by either the aeshell thread or the spawned program)
- *
- * Because you can inject an input stream into the console, we create this which can act as a
- * facade to System.in under normal 'unpaused' circumstances, and when paused, turn off the
- * access to System.in.  This allows us to turn off access to aeshell while maintaining access
- * to the external program.
- *
- */
-public class PausableInput extends InputStream {
-  InputStream in = System.in;
-  boolean paused = false;
-  private PausableInput() {
-    super();
-  }
-
-  /**
-   * Stop mirroring stdin
-   */
-  public void pause() {
-    paused = true;
-  }
-
-  /**
-   * Resume mirroring stdin.
-   * @throws IOException
-   */
-  public void unpause() throws IOException {
-    in.read(new byte[in.available()]);
-    paused = false;
-  }
-
-  public final static PausableInput INSTANCE = new PausableInput();
-
-  /**
-   * Reads the next byte of data from the input stream. The value byte is
-   * returned as an <code>int</code> in the range <code>0</code> to
-   * <code>255</code>. If no byte is available because the end of the stream
-   * has been reached, the value <code>-1</code> is returned. This method
-   * blocks until input data is available, the end of the stream is detected,
-   * or an exception is thrown.
-   * <p>
-   * <p> A subclass must provide an implementation of this method.
-   *
-   * @return the next byte of data, or <code>-1</code> if the end of the
-   * stream is reached.
-   * @throws IOException if an I/O error occurs.
-   */
-  @Override
-  public int read() throws IOException {
-
-    return in.read();
-  }
-
-  /**
-   * Reads some number of bytes from the input stream and stores them into
-   * the buffer array <code>b</code>. The number of bytes actually read is
-   * returned as an integer.  This method blocks until input data is
-   * available, end of file is detected, or an exception is thrown.
-   * <p>
-   * <p> If the length of <code>b</code> is zero, then no bytes are read and
-   * <code>0</code> is returned; otherwise, there is an attempt to read at
-   * least one byte. If no byte is available because the stream is at the
-   * end of the file, the value <code>-1</code> is returned; otherwise, at
-   * least one byte is read and stored into <code>b</code>.
-   * <p>
-   * <p> The first byte read is stored into element <code>b[0]</code>, the
-   * next one into <code>b[1]</code>, and so on. The number of bytes read is,
-   * at most, equal to the length of <code>b</code>. Let <i>k</i> be the
-   * number of bytes actually read; these bytes will be stored in elements
-   * <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>,
-   * leaving elements <code>b[</code><i>k</i><code>]</code> through
-   * <code>b[b.length-1]</code> unaffected.
-   * <p>
-   * <p> The <code>read(b)</code> method for class <code>InputStream</code>
-   * has the same effect as: <pre><code> read(b, 0, b.length) </code></pre>
-   *
-   * @param b the buffer into which the data is read.
-   * @return the total number of bytes read into the buffer, or
-   * <code>-1</code> if there is no more data because the end of
-   * the stream has been reached.
-   * @throws IOException          If the first byte cannot be read for any reason
-   *                              other than the end of the file, if the input stream has been closed, or
-   *                              if some other I/O error occurs.
-   * @throws NullPointerException if <code>b</code> is <code>null</code>.
-   * @see InputStream#read(byte[], int, int)
-   */
-  @Override
-  public int read(byte[] b) throws IOException {
-
-    if(paused) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      return 0;
-    }
-    int ret = in.read(b);
-    return ret;
-  }
-
-  /**
-   * Reads up to <code>len</code> bytes of data from the input stream into
-   * an array of bytes.  An attempt is made to read as many as
-   * <code>len</code> bytes, but a smaller number may be read.
-   * The number of bytes actually read is returned as an integer.
-   * <p>
-   * <p> This method blocks until input data is available, end of file is
-   * detected, or an exception is thrown.
-   * <p>
-   * <p> If <code>len</code> is zero, then no bytes are read and
-   * <code>0</code> is returned; otherwise, there is an attempt to read at
-   * least one byte. If no byte is available because the stream is at end of
-   * file, the value <code>-1</code> is returned; otherwise, at least one
-   * byte is read and stored into <code>b</code>.
-   * <p>
-   * <p> The first byte read is stored into element <code>b[off]</code>, the
-   * next one into <code>b[off+1]</code>, and so on. The number of bytes read
-   * is, at most, equal to <code>len</code>. Let <i>k</i> be the number of
-   * bytes actually read; these bytes will be stored in elements
-   * <code>b[off]</code> through <code>b[off+</code><i>k</i><code>-1]</code>,
-   * leaving elements <code>b[off+</code><i>k</i><code>]</code> through
-   * <code>b[off+len-1]</code> unaffected.
-   * <p>
-   * <p> In every case, elements <code>b[0]</code> through
-   * <code>b[off]</code> and elements <code>b[off+len]</code> through
-   * <code>b[b.length-1]</code> are unaffected.
-   * <p>
-   * <p> The <code>read(b,</code> <code>off,</code> <code>len)</code> method
-   * for class <code>InputStream</code> simply calls the method
-   * <code>read()</code> repeatedly. If the first such call results in an
-   * <code>IOException</code>, that exception is returned from the call to
-   * the <code>read(b,</code> <code>off,</code> <code>len)</code> method.  If
-   * any subsequent call to <code>read()</code> results in a
-   * <code>IOException</code>, the exception is caught and treated as if it
-   * were end of file; the bytes read up to that point are stored into
-   * <code>b</code> and the number of bytes read before the exception
-   * occurred is returned. The default implementation of this method blocks
-   * until the requested amount of input data <code>len</code> has been read,
-   * end of file is detected, or an exception is thrown. Subclasses are encouraged
-   * to provide a more efficient implementation of this method.
-   *
-   * @param b   the buffer into which the data is read.
-   * @param off the start offset in array <code>b</code>
-   *            at which the data is written.
-   * @param len the maximum number of bytes to read.
-   * @return the total number of bytes read into the buffer, or
-   * <code>-1</code> if there is no more data because the end of
-   * the stream has been reached.
-   * @throws IOException               If the first byte cannot be read for any reason
-   *                                   other than end of file, or if the input stream has been closed, or if
-   *                                   some other I/O error occurs.
-   * @throws NullPointerException      If <code>b</code> is <code>null</code>.
-   * @throws IndexOutOfBoundsException If <code>off</code> is negative,
-   *                                   <code>len</code> is negative, or <code>len</code> is greater than
-   *                                   <code>b.length - off</code>
-   * @see InputStream#read()
-   */
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    if(paused) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-      return 0;
-    }
-    int ret = in.read(b, off, len);
-    return ret;
-  }
-
-  /**
-   * Skips over and discards <code>n</code> bytes of data from this input
-   * stream. The <code>skip</code> method may, for a variety of reasons, end
-   * up skipping over some smaller number of bytes, possibly <code>0</code>.
-   * This may result from any of a number of conditions; reaching end of file
-   * before <code>n</code> bytes have been skipped is only one possibility.
-   * The actual number of bytes skipped is returned. If {@code n} is
-   * negative, the {@code skip} method for class {@code InputStream} always
-   * returns 0, and no bytes are skipped. Subclasses may handle the negative
-   * value differently.
-   * <p>
-   * <p> The <code>skip</code> method of this class creates a
-   * byte array and then repeatedly reads into it until <code>n</code> bytes
-   * have been read or the end of the stream has been reached. Subclasses are
-   * encouraged to provide a more efficient implementation of this method.
-   * For instance, the implementation may depend on the ability to seek.
-   *
-   * @param n the number of bytes to be skipped.
-   * @return the actual number of bytes skipped.
-   * @throws IOException if the stream does not support seek,
-   *                     or if some other I/O error occurs.
-   */
-  @Override
-  public long skip(long n) throws IOException {
-
-    return in.skip(n);
-  }
-
-  /**
-   * Returns an estimate of the number of bytes that can be read (or
-   * skipped over) from this input stream without blocking by the next
-   * invocation of a method for this input stream. The next invocation
-   * might be the same thread or another thread.  A single read or skip of this
-   * many bytes will not block, but may read or skip fewer bytes.
-   * <p>
-   * <p> Note that while some implementations of {@code InputStream} will return
-   * the total number of bytes in the stream, many will not.  It is
-   * never correct to use the return value of this method to allocate
-   * a buffer intended to hold all data in this stream.
-   * <p>
-   * <p> A subclass' implementation of this method may choose to throw an
-   * {@link IOException} if this input stream has been closed by
-   * invoking the {@link #close()} method.
-   * <p>
-   * <p> The {@code available} method for class {@code InputStream} always
-   * returns {@code 0}.
-   * <p>
-   * <p> This method should be overridden by subclasses.
-   *
-   * @return an estimate of the number of bytes that can be read (or skipped
-   * over) from this input stream without blocking or {@code 0} when
-   * it reaches the end of the input stream.
-   * @throws IOException if an I/O error occurs.
-   */
-  @Override
-  public int available() throws IOException {
-
-    return in.available();
-  }
-
-  /**
-   * Closes this input stream and releases any system resources associated
-   * with the stream.
-   * <p>
-   * <p> The <code>close</code> method of <code>InputStream</code> does
-   * nothing.
-   *
-   * @throws IOException if an I/O error occurs.
-   */
-  @Override
-  public void close() throws IOException {
-    in.close();
-  }
-
-  /**
-   * Marks the current position in this input stream. A subsequent call to
-   * the <code>reset</code> method repositions this stream at the last marked
-   * position so that subsequent reads re-read the same bytes.
-   * <p>
-   * <p> The <code>readlimit</code> arguments tells this input stream to
-   * allow that many bytes to be read before the mark position gets
-   * invalidated.
-   * <p>
-   * <p> The general contract of <code>mark</code> is that, if the method
-   * <code>markSupported</code> returns <code>true</code>, the stream somehow
-   * remembers all the bytes read after the call to <code>mark</code> and
-   * stands ready to supply those same bytes again if and whenever the method
-   * <code>reset</code> is called.  However, the stream is not required to
-   * remember any data at all if more than <code>readlimit</code> bytes are
-   * read from the stream before <code>reset</code> is called.
-   * <p>
-   * <p> Marking a closed stream should not have any effect on the stream.
-   * <p>
-   * <p> The <code>mark</code> method of <code>InputStream</code> does
-   * nothing.
-   *
-   * @param readlimit the maximum limit of bytes that can be read before
-   *                  the mark position becomes invalid.
-   * @see InputStream#reset()
-   */
-  @Override
-  public synchronized void mark(int readlimit) {
-    in.mark(readlimit);
-  }
-
-  /**
-   * Repositions this stream to the position at the time the
-   * <code>mark</code> method was last called on this input stream.
-   * <p>
-   * <p> The general contract of <code>reset</code> is:
-   * <p>
-   * <ul>
-   * <li> If the method <code>markSupported</code> returns
-   * <code>true</code>, then:
-   * <p>
-   * <ul><li> If the method <code>mark</code> has not been called since
-   * the stream was created, or the number of bytes read from the stream
-   * since <code>mark</code> was last called is larger than the argument
-   * to <code>mark</code> at that last call, then an
-   * <code>IOException</code> might be thrown.
-   * <p>
-   * <li> If such an <code>IOException</code> is not thrown, then the
-   * stream is reset to a state such that all the bytes read since the
-   * most recent call to <code>mark</code> (or since the start of the
-   * file, if <code>mark</code> has not been called) will be resupplied
-   * to subsequent callers of the <code>read</code> method, followed by
-   * any bytes that otherwise would have been the next input data as of
-   * the time of the call to <code>reset</code>. </ul>
-   * <p>
-   * <li> If the method <code>markSupported</code> returns
-   * <code>false</code>, then:
-   * <p>
-   * <ul><li> The call to <code>reset</code> may throw an
-   * <code>IOException</code>.
-   * <p>
-   * <li> If an <code>IOException</code> is not thrown, then the stream
-   * is reset to a fixed state that depends on the particular type of the
-   * input stream and how it was created. The bytes that will be supplied
-   * to subsequent callers of the <code>read</code> method depend on the
-   * particular type of the input stream. </ul></ul>
-   * <p>
-   * <p>The method <code>reset</code> for class <code>InputStream</code>
-   * does nothing except throw an <code>IOException</code>.
-   *
-   * @throws IOException if this stream has not been marked or if the
-   *                     mark has been invalidated.
-   * @see InputStream#mark(int)
-   * @see IOException
-   */
-  @Override
-  public synchronized void reset() throws IOException {
-    in.reset();
-  }
-
-  /**
-   * Tests if this input stream supports the <code>mark</code> and
-   * <code>reset</code> methods. Whether or not <code>mark</code> and
-   * <code>reset</code> are supported is an invariant property of a
-   * particular input stream instance. The <code>markSupported</code> method
-   * of <code>InputStream</code> returns <code>false</code>.
-   *
-   * @return <code>true</code> if this stream instance supports the mark
-   * and reset methods; <code>false</code> otherwise.
-   * @see InputStream#mark(int)
-   * @see InputStream#reset()
-   */
-  @Override
-  public boolean markSupported() {
-    return in.markSupported();
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarExecutor.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarExecutor.java
deleted file mode 100644
index 2a13a0b..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarExecutor.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.common.stellar.shell;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import org.apache.commons.collections4.trie.PatriciaTrie;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.dsl.Context;
-import org.apache.metron.common.dsl.MapVariableResolver;
-import org.apache.metron.common.dsl.StellarFunctionInfo;
-import org.apache.metron.common.dsl.StellarFunctions;
-import org.apache.metron.common.dsl.VariableResolver;
-import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
-import org.apache.metron.common.stellar.StellarProcessor;
-import org.apache.metron.common.utils.JSONUtils;
-import org.jboss.aesh.console.Console;
-
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.SortedMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper;
-import static org.apache.metron.common.stellar.shell.StellarExecutor.OperationType.DOC;
-import static org.apache.metron.common.stellar.shell.StellarExecutor.OperationType.NORMAL;
-
-import static org.apache.metron.common.dsl.Context.Capabilities.*;
-
-/**
- * Executes Stellar expressions and maintains state across multiple invocations.
- */
-public class StellarExecutor {
-
-  public static String SHELL_VARIABLES = "shellVariables";
-  public static String CONSOLE = "console";
-
-  private ReadWriteLock indexLock = new ReentrantReadWriteLock();
-
-  public static class VariableResult {
-    private String expression;
-    private Object result;
-
-    public VariableResult(String expression, Object result) {
-      this.expression = expression;
-      this.result = result;
-    }
-
-    public String getExpression() {
-      return expression;
-    }
-
-    public Object getResult() {
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      String ret = "" + result;
-      if(expression != null) {
-        ret += " via " + expression;
-      }
-      return ret;
-    }
-  }
-
-  /**
-   * prefix tree index of autocompletes
-   */
-  private PatriciaTrie<AutoCompleteType> autocompleteIndex;
-  /**
-   * The variables known by Stellar.
-   */
-  private Map<String, VariableResult> variables;
-
-  /**
-   * The function resolver.
-   */
-  private FunctionResolver functionResolver;
-
-  /**
-   * A Zookeeper client. Only defined if given a valid Zookeeper URL.
-   */
-  private Optional<CuratorFramework> client;
-
-  /**
-   * The Stellar execution context.
-   */
-  private Context context;
-
-  private Console console;
-
-  public enum OperationType {
-    DOC,MAGIC,NORMAL;
-  }
-
-  public interface AutoCompleteTransformation {
-    String transform(OperationType type, String key);
-  }
-
-  public enum AutoCompleteType implements AutoCompleteTransformation{
-      FUNCTION((type, key) -> {
-        if(type == DOC) {
-          return StellarShell.DOC_PREFIX + key;
-        }
-        else if(type == NORMAL) {
-          return key + "(";
-        }
-        return key;
-      })
-    , VARIABLE((type, key) -> key )
-    , TOKEN((type, key) -> key)
-    ;
-    AutoCompleteTransformation transform;
-    AutoCompleteType(AutoCompleteTransformation transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public String transform(OperationType type, String key) {
-      return transform.transform(type, key);
-    }
-
-  }
-
-  /**
-   * @param console The console used to drive the REPL.
-   * @param properties The Stellar properties.
-   * @throws Exception
-   */
-  public StellarExecutor(Console console, Properties properties) throws Exception {
-    this(null, console, properties);
-  }
-
-  /**
-   * @param console The console used to drive the REPL.
-   * @param properties The Stellar properties.
-   * @throws Exception
-   */
-  public StellarExecutor(String zookeeperUrl, Console console, Properties properties) throws Exception {
-    this.variables = new HashMap<>();
-    this.client = createClient(zookeeperUrl);
-    this.context = createContext(properties);
-
-    // initialize the default function resolver
-    StellarFunctions.initialize(this.context);
-    this.functionResolver = StellarFunctions.FUNCTION_RESOLVER();
-
-    this.autocompleteIndex = initializeIndex();
-    this.console = console;
-
-    // asynchronously update the index with function names found from a classpath scan.
-    new Thread( () -> {
-        Iterable<StellarFunctionInfo> functions = functionResolver.getFunctionInfo();
-        indexLock.writeLock().lock();
-        try {
-          for(StellarFunctionInfo info: functions) {
-            String functionName = info.getName();
-            autocompleteIndex.put(functionName, AutoCompleteType.FUNCTION);
-          }
-        }
-          finally {
-            System.out.println("Functions loaded, you may refer to functions now...");
-            indexLock.writeLock().unlock();
-          }
-    }).start();
-  }
-
-  private PatriciaTrie<AutoCompleteType> initializeIndex() {
-    Map<String, AutoCompleteType> index = new HashMap<>();
-
-    index.put("==", AutoCompleteType.TOKEN);
-    index.put(">=", AutoCompleteType.TOKEN);
-    index.put("<=", AutoCompleteType.TOKEN);
-    index.put(":=", AutoCompleteType.TOKEN);
-    index.put("quit", AutoCompleteType.TOKEN);
-    index.put(StellarShell.MAGIC_FUNCTIONS, AutoCompleteType.FUNCTION);
-    index.put(StellarShell.MAGIC_VARS, AutoCompleteType.FUNCTION);
-    return new PatriciaTrie<>(index);
-  }
-
-  public Iterable<String> autoComplete(String buffer, final OperationType opType) {
-    indexLock.readLock().lock();
-    try {
-      SortedMap<String, AutoCompleteType> ret = autocompleteIndex.prefixMap(buffer);
-      if (ret.isEmpty()) {
-        return new ArrayList<>();
-      }
-      return Iterables.transform(ret.entrySet(), kv -> kv.getValue().transform(opType, kv.getKey()));
-    }
-    finally {
-      indexLock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Creates a Zookeeper client.
-   * @param zookeeperUrl The Zookeeper URL.
-   */
-  private Optional<CuratorFramework> createClient(String zookeeperUrl) {
-
-    // can only create client, if have valid zookeeper URL
-    if(StringUtils.isNotBlank(zookeeperUrl)) {
-      CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
-      client.start();
-      return Optional.of(client);
-
-    } else {
-      return Optional.empty();
-    }
-  }
-
-  /**
-   * Creates a Context initialized with configuration stored in Zookeeper.
-   */
-  private Context createContext(Properties properties) throws Exception {
-
-    Context.Builder contextBuilder = new Context.Builder()
-            .with(SHELL_VARIABLES, () -> variables)
-            .with(CONSOLE, () -> console)
-            .with(STELLAR_CONFIG, () -> properties);
-
-    // load global configuration from zookeeper
-    if (client.isPresent()) {
-
-      // fetch the global configuration
-      Map<String, Object> global = JSONUtils.INSTANCE.load(
-              new ByteArrayInputStream(readGlobalConfigBytesFromZookeeper(client.get())),
-              new TypeReference<Map<String, Object>>() {});
-
-      contextBuilder
-              .with(GLOBAL_CONFIG, () -> global)
-              .with(ZOOKEEPER_CLIENT, () -> client.get())
-              .with(STELLAR_CONFIG, () -> getStellarConfig(global, properties));
-    }
-
-    return contextBuilder.build();
-  }
-
-  private Map<String, Object> getStellarConfig(Map<String, Object> globalConfig, Properties props) {
-    Map<String, Object> ret = new HashMap<>();
-    ret.putAll(globalConfig);
-    if(props != null) {
-      for (Map.Entry<Object, Object> kv : props.entrySet()) {
-        ret.put(kv.getKey().toString(), kv.getValue());
-      }
-    }
-    return ret;
-  }
-
-  /**
-   * Executes the Stellar expression and returns the result.
-   * @param expression The Stellar expression to execute.
-   * @return The result of the expression.
-   */
-  public Object execute(String expression) {
-    VariableResolver variableResolver = new MapVariableResolver(Maps.transformValues(variables, result -> result.getResult())
-                                                               , Collections.emptyMap());
-    StellarProcessor processor = new StellarProcessor();
-    return processor.parse(expression, variableResolver, functionResolver, context);
-  }
-
-  /**
-   * Assigns a value to a variable.
-   * @param variable The name of the variable.
-   * @param value The value of the variable
-   */
-  public void assign(String variable, String expression, Object value) {
-    this.variables.put(variable, new VariableResult(expression, value));
-    indexLock.writeLock().lock();
-    try {
-      if (value != null) {
-        this.autocompleteIndex.put(variable, AutoCompleteType.VARIABLE);
-      } else {
-        this.autocompleteIndex.remove(variable);
-      }
-    }
-    finally {
-      indexLock.writeLock().unlock();
-    }
-  }
-
-  public Map<String, VariableResult> getVariables() {
-    return this.variables;
-  }
-
-  public FunctionResolver getFunctionResolver() {
-    return functionResolver;
-  }
-
-  public Context getContext() {
-    return context;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
deleted file mode 100644
index f13f1e3..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/stellar/shell/StellarShell.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.common.stellar.shell;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.dsl.Context;
-import org.apache.metron.common.dsl.StellarFunctionInfo;
-import org.apache.metron.common.dsl.functions.resolver.ClasspathFunctionResolver;
-import org.apache.metron.common.stellar.StellarAssignment;
-import org.apache.metron.common.utils.JSONUtils;
-import org.jboss.aesh.complete.CompleteOperation;
-import org.jboss.aesh.complete.Completion;
-import org.jboss.aesh.console.AeshConsoleCallback;
-import org.jboss.aesh.console.Console;
-import org.jboss.aesh.console.ConsoleOperation;
-import org.jboss.aesh.console.Prompt;
-import org.jboss.aesh.console.settings.SettingsBuilder;
-import org.jboss.aesh.terminal.CharacterType;
-import org.jboss.aesh.terminal.Color;
-import org.jboss.aesh.terminal.TerminalCharacter;
-import org.jboss.aesh.terminal.TerminalColor;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-import static org.apache.metron.common.dsl.functions.resolver.ClasspathFunctionResolver.Config.STELLAR_SEARCH_INCLUDES_KEY;
-
-/**
- * A REPL environment for Stellar.
- *
- * Useful for debugging Stellar expressions.
- */
-public class StellarShell extends AeshConsoleCallback implements Completion {
-
-  private static final String WELCOME = "Stellar, Go!\n" +
-          "Please note that functions are loading lazily in the background and will be unavailable until loaded fully.";
-  private List<TerminalCharacter> EXPRESSION_PROMPT = new ArrayList<TerminalCharacter>()
-  {{
-    add(new TerminalCharacter('[', new TerminalColor(Color.RED, Color.DEFAULT)));
-    add(new TerminalCharacter('S', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter('t', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter('e', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter('l', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter('l', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter('a', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter('r', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.BOLD));
-    add(new TerminalCharacter(']', new TerminalColor(Color.RED, Color.DEFAULT)));
-    add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.UNDERLINE));
-    add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.UNDERLINE));
-    add(new TerminalCharacter('>', new TerminalColor(Color.GREEN, Color.DEFAULT), CharacterType.UNDERLINE));
-    add(new TerminalCharacter(' ', new TerminalColor(Color.DEFAULT, Color.DEFAULT)));
-  }};
-
-  public static final String ERROR_PROMPT = "[!] ";
-  public static final String MAGIC_PREFIX = "%";
-  public static final String MAGIC_FUNCTIONS = MAGIC_PREFIX + "functions";
-  public static final String MAGIC_VARS = MAGIC_PREFIX + "vars";
-  public static final String DOC_PREFIX = "?";
-  public static final String STELLAR_PROPERTIES_FILENAME = "stellar.properties";
-
-  private StellarExecutor executor;
-
-  private Console console;
-
-  /**
-   * Execute the Stellar REPL.
-   */
-  public static void main(String[] args) throws Exception {
-    StellarShell shell = new StellarShell(args);
-    shell.execute();
-  }
-
-  /**
-   * Create a Stellar REPL.
-   * @param args The commmand-line arguments.
-   */
-  public StellarShell(String[] args) throws Exception {
-
-    // define valid command-line options
-    Options options = new Options();
-    options.addOption("z", "zookeeper", true, "Zookeeper URL");
-    options.addOption("v", "variables", true, "File containing a JSON Map of variables");
-    options.addOption("irc", "inputrc", true, "File containing the inputrc if not the default ~/.inputrc");
-    options.addOption("na", "no_ansi", false, "Make the input prompt not use ANSI colors.");
-    options.addOption("h", "help", false, "Print help");
-    options.addOption("p", "properties", true, "File containing Stellar properties");
-
-    CommandLineParser parser = new PosixParser();
-    CommandLine commandLine = parser.parse(options, args);
-
-    // print help
-    if(commandLine.hasOption("h")) {
-      HelpFormatter formatter = new HelpFormatter();
-      formatter.printHelp("stellar", options);
-      System.exit(0);
-    }
-
-    console = createConsole(commandLine);
-    executor = createExecutor(commandLine, console, getStellarProperties(commandLine));
-    loadVariables(commandLine, executor);
-    console.setPrompt(new Prompt(EXPRESSION_PROMPT));
-    console.addCompletion(this);
-    console.setConsoleCallback(this);
-  }
-
-  /**
-   * Loads any variables defined in an external file.
-   * @param commandLine The command line arguments.
-   * @param executor The stellar executor.
-   * @throws IOException
-   */
-  private static void loadVariables(CommandLine commandLine, StellarExecutor executor) throws IOException {
-    if(commandLine.hasOption("v")) {
-
-      Map<String, Object> variables = JSONUtils.INSTANCE.load(
-              new File(commandLine.getOptionValue("v")),
-              new TypeReference<Map<String, Object>>() {});
-
-      for(Map.Entry<String, Object> kv : variables.entrySet()) {
-        executor.assign(kv.getKey(), null, kv.getValue());
-      }
-    }
-  }
-
-  /**
-   * Creates the Stellar execution environment.
-   * @param commandLine The command line arguments.
-   * @param console The console which drives the REPL.
-   * @param properties Stellar properties.
-   */
-  private static StellarExecutor createExecutor(CommandLine commandLine, Console console, Properties properties) throws Exception {
-    StellarExecutor executor;
-
-    // create the executor
-    if(commandLine.hasOption("z")) {
-      String zookeeperUrl = commandLine.getOptionValue("z");
-      executor = new StellarExecutor(zookeeperUrl, console, properties);
-
-    } else {
-      executor = new StellarExecutor(console, properties);
-    }
-
-    return executor;
-  }
-
-  /**
-   * Creates the REPL's console.
-   * @param commandLine The command line options.
-   */
-  private Console createConsole(CommandLine commandLine) {
-
-    // console settings
-    boolean useAnsi = !commandLine.hasOption("na");
-    SettingsBuilder settings = new SettingsBuilder().enableAlias(true)
-                                                    .enableMan(true)
-                                                    .ansi(useAnsi)
-                                                    .parseOperators(false)
-                                                    .inputStream(PausableInput.INSTANCE);
-
-    if(commandLine.hasOption("irc")) {
-      settings = settings.inputrc(new File(commandLine.getOptionValue("irc")));
-    }
-
-    return new Console(settings.create());
-  }
-
-  /**
-   * Retrieves the Stellar properties. The properties are either loaded from a file in
-   * the classpath or a set of defaults are used.
-   */
-  private Properties getStellarProperties(CommandLine commandLine) throws IOException {
-    Properties properties = new Properties();
-
-    if (commandLine.hasOption("p")) {
-
-      // first attempt to load properties from a file specified on the command-line
-      try (InputStream in = new FileInputStream(commandLine.getOptionValue("p"))) {
-        if(in != null) {
-          properties.load(in);
-        }
-      }
-
-    } else {
-
-      // otherwise attempt to load properties from the classpath
-      try (InputStream in = getClass().getClassLoader().getResourceAsStream(STELLAR_PROPERTIES_FILENAME)) {
-        if(in != null) {
-          properties.load(in);
-        }
-      }
-    }
-
-    return properties;
-  }
-
-  /**
-   * Handles the main loop for the REPL.
-   */
-  public void execute() {
-
-    // welcome message and print globals
-    writeLine(WELCOME);
-    executor.getContext()
-            .getCapability(Context.Capabilities.GLOBAL_CONFIG, false)
-            .ifPresent(conf -> writeLine(conf.toString()));
-
-    console.start();
-  }
-
-  /**
-   * Handles user interaction when executing a Stellar expression.
-   * @param expression The expression to execute.
-   */
-  private void handleStellar(String expression) {
-
-    String stellarExpression = expression;
-    String variable = null;
-    if(StellarAssignment.isAssignment(expression)) {
-      StellarAssignment expr = StellarAssignment.from(expression);
-      variable = expr.getVariable();
-      stellarExpression = expr.getStatement();
-    }
-    else {
-      if (!stellarExpression.isEmpty()) {
-        stellarExpression = stellarExpression.trim();
-      }
-    }
-    Object result = executeStellar(stellarExpression);
-    if(result != null && variable == null) {
-      writeLine(result.toString());
-    }
-    if(variable != null) {
-      executor.assign(variable, stellarExpression, result);
-    }
-  }
-
-  /**
-   * Handles user interaction when executing a Magic command.
-   * @param rawExpression The expression to execute.
-   */
-  private void handleMagic( String rawExpression) {
-    String expression = rawExpression.trim();
-    if(MAGIC_FUNCTIONS.equals(expression)) {
-
-      // list all functions
-      String functions = StreamSupport
-              .stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false)
-              .map(info -> String.format("%s", info.getName()))
-              .sorted()
-              .collect(Collectors.joining(", "));
-      writeLine(functions);
-
-    } else if(MAGIC_VARS.equals(expression)) {
-
-      // list all variables
-
-      executor.getVariables()
-              .forEach((k,v) -> writeLine(String.format("%s = %s", k, v)));
-
-    } else {
-      writeLine(ERROR_PROMPT + "undefined magic command: " + expression);
-    }
-  }
-
-  /**
-   * Handles user interaction when executing a doc command.
-   * @param expression The expression to execute.
-   */
-  private void handleDoc(String expression) {
-
-    String functionName = StringUtils.substring(expression, 1);
-    StreamSupport
-            .stream(executor.getFunctionResolver().getFunctionInfo().spliterator(), false)
-            .filter(info -> StringUtils.equals(functionName, info.getName()))
-            .map(info -> format(info))
-            .forEach(doc -> write(doc));
-  }
-
-  /**
-   * Formats the Stellar function info object into a readable string.
-   * @param info The stellar function info object.
-   * @return A readable string.
-   */
-  private String format(StellarFunctionInfo info) {
-    StringBuffer ret = new StringBuffer();
-    ret.append(info.getName() + "\n");
-    ret.append(String.format("Description: %-60s\n\n", info.getDescription()));
-    if(info.getParams().length > 0) {
-      ret.append("Arguments:\n");
-      for(String param : info.getParams()) {
-        ret.append(String.format("\t%-60s\n", param));
-      }
-      ret.append("\n");
-    }
-    ret.append(String.format("Returns: %-60s\n", info.getReturns()));
-
-    return ret.toString();
-  }
-
-  /**
-   * Is a given expression a built-in magic?
-   * @param expression The expression.
-   */
-  private boolean isMagic(String expression) {
-    return StringUtils.startsWith(expression, MAGIC_PREFIX);
-  }
-
-  /**
-   * Is a given expression asking for function documentation?
-   * @param expression The expression.
-   */
-  private boolean isDoc(String expression) {
-    return StringUtils.startsWith(expression, DOC_PREFIX);
-  }
-
-  /**
-   * Executes a Stellar expression.
-   * @param expression The expression to execute.
-   * @return The result of the expression.
-   */
-  private Object executeStellar(String expression) {
-    Object result = null;
-
-    try {
-      result = executor.execute(expression);
-
-    } catch(Throwable t) {
-      writeLine(ERROR_PROMPT + t.getMessage());
-      t.printStackTrace();
-    }
-
-    return result;
-  }
-
-  private void write(String out) {
-    System.out.print(out);
-  }
-
-  private void writeLine(String out) {
-    console.getShell().out().println(out);
-  }
-
-  @Override
-  public int execute(ConsoleOperation output) throws InterruptedException {
-    String expression = output.getBuffer().trim();
-    if(StringUtils.isNotBlank(expression) ) {
-      if(isMagic(expression)) {
-        handleMagic( expression);
-
-      } else if(isDoc(expression)) {
-        handleDoc(expression);
-
-      } else if (expression.equals("quit")) {
-        try {
-          console.stop();
-        } catch (Throwable e) {
-          e.printStackTrace();
-        }
-      }
-      else if(expression.charAt(0) == '#') {
-        return 0;
-      }
-      else {
-        handleStellar(expression);
-      }
-    }
-
-    return 0;
-  }
-
-  @Override
-  public void complete(CompleteOperation completeOperation) {
-    if(!completeOperation.getBuffer().isEmpty()) {
-      String lastToken = Iterables.getLast(Splitter.on(" ").split(completeOperation.getBuffer()), null);
-      if(lastToken != null && !lastToken.isEmpty()) {
-        lastToken = lastToken.trim();
-        final String lastBit = lastToken;
-        final boolean isDocRequest = isDoc(lastToken);
-        if(isDocRequest) {
-          lastToken = lastToken.substring(1);
-        }
-        StellarExecutor.OperationType opType = StellarExecutor.OperationType.NORMAL;
-        if(isDocRequest) {
-          opType = StellarExecutor.OperationType.DOC;
-        }
-        else if(isMagic(lastToken)) {
-          opType = StellarExecutor.OperationType.MAGIC;
-        }
-        Iterable<String> candidates = executor.autoComplete(lastToken, opType);
-        if(candidates != null && !Iterables.isEmpty(candidates)) {
-          completeOperation.setCompletionCandidates( Lists.newArrayList(
-                  Iterables.transform(candidates, s -> stripOff(completeOperation.getBuffer(), lastBit) + s )
-                  )
-          );
-        }
-      }
-    }
-
-  }
-
-  private static String stripOff(String baseString, String lastBit) {
-    int index = baseString.lastIndexOf(lastBit);
-    if(index < 0) {
-      return baseString;
-    }
-    return baseString.substring(0, index);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
deleted file mode 100644
index ec2ecfd..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.utils;
-
-import com.google.common.hash.Funnel;
-import com.google.common.hash.PrimitiveSink;
-
-import java.io.Serializable;
-import java.util.function.Function;
-
-public class BloomFilter<T> implements Serializable {
-
-  private static class BloomFunnel<T> implements Funnel<T>, Serializable {
-    Function<T, byte[]> serializer;
-    public BloomFunnel(Function<T, byte[]> serializer) {
-      this.serializer = serializer;
-    }
-    @Override
-    public void funnel(T obj, PrimitiveSink primitiveSink) {
-      primitiveSink.putBytes(serializer.apply(obj));
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return this.getClass().equals(obj.getClass());
-    }
-
-    @Override
-    public int hashCode() {
-      return super.hashCode() * 31;
-    }
-  }
-
-  public static class DefaultSerializer<T> implements Function<T, byte[]> {
-    @Override
-    public byte[] apply(T t) {
-      return SerDeUtils.toBytes(t);
-    }
-  }
-  private com.google.common.hash.BloomFilter<T> filter;
-
-  public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, double falsePositiveRate) {
-    filter = com.google.common.hash.BloomFilter.create(new BloomFunnel<T>(serializer), expectedInsertions, falsePositiveRate);
-  }
-
-  public boolean mightContain(T key) {
-    return filter.mightContain(key);
-  }
-  public void add(T key) {
-    filter.put(key);
-  }
-  public void merge(BloomFilter<T> filter2) {
-    filter.putAll(filter2.filter);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    BloomFilter<?> that = (BloomFilter<?>) o;
-
-    return filter != null ? filter.equals(that.filter) : that.filter == null;
-
-  }
-
-  @Override
-  public int hashCode() {
-    return filter != null ? filter.hashCode() : 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
deleted file mode 100644
index 97b36ee..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.utils;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.beanutils.BeanUtilsBean2;
-import org.apache.commons.beanutils.ConvertUtilsBean;
-
-import java.util.List;
-
-public class ConversionUtils {
-  private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN = new ThreadLocal<ConvertUtilsBean>() {
-    @Override
-    protected ConvertUtilsBean initialValue() {
-      ConvertUtilsBean ret = BeanUtilsBean2.getInstance().getConvertUtils();
-      ret.deregister();
-      ret.register(false, true, 1);
-      return ret;
-    }
-  };
-
-  public static <T> T convert(Object o, Class<T> clazz) {
-    if (o == null) {
-      return null;
-    }
-    return clazz.cast(UTILS_BEAN.get().convert(o, clazz));
-  }
-
-  /**
-   * Performs naive List type conversion.
-   *
-   * @param from Source list
-   * @param clazz Class type to cast the List elements to
-   * @param <T> Source element type
-   * @param <U> Desired element type
-   * @return New List with the elements cast to the desired type
-   */
-  public static <T, U> List<U> convertList(List<T> from, Class<U> clazz) {
-    return Lists.transform(from, s -> convert(s, clazz));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
index bbd8b30..a26a3bb 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -26,8 +26,6 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/VFSClassloaderUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/VFSClassloaderUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/VFSClassloaderUtil.java
deleted file mode 100644
index 1a02fef..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/VFSClassloaderUtil.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.utils;
-
-import org.apache.accumulo.start.classloader.vfs.UniqueFileReplicator;
-import org.apache.commons.vfs2.*;
-import org.apache.commons.vfs2.cache.SoftRefFilesCache;
-import org.apache.commons.vfs2.impl.DefaultFileSystemManager;
-import org.apache.commons.vfs2.impl.FileContentInfoFilenameFactory;
-import org.apache.commons.vfs2.impl.VFSClassLoader;
-import org.apache.commons.vfs2.provider.hdfs.HdfsFileProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.*;
-
-public class VFSClassloaderUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(VFSClassloaderUtil.class);
-
-  /**
-   * Create a FileSystem manager suitable for our purposes.
-   * This manager supports files of the following types:
-   * * res - resource files
-   * * jar
-   * * tar
-   * * bz2
-   * * tgz
-   * * zip
-   * * HDFS
-   * * FTP
-   * * HTTP/S
-   * * file
-   * @return
-   * @throws FileSystemException
-   */
-  public static FileSystemManager generateVfs() throws FileSystemException {
-    DefaultFileSystemManager vfs = new DefaultFileSystemManager();
-    vfs.addProvider("res", new org.apache.commons.vfs2.provider.res.ResourceFileProvider());
-    vfs.addProvider("zip", new org.apache.commons.vfs2.provider.zip.ZipFileProvider());
-    vfs.addProvider("gz", new org.apache.commons.vfs2.provider.gzip.GzipFileProvider());
-    vfs.addProvider("ram", new org.apache.commons.vfs2.provider.ram.RamFileProvider());
-    vfs.addProvider("file", new org.apache.commons.vfs2.provider.local.DefaultLocalFileProvider());
-    vfs.addProvider("jar", new org.apache.commons.vfs2.provider.jar.JarFileProvider());
-    vfs.addProvider("http", new org.apache.commons.vfs2.provider.http.HttpFileProvider());
-    vfs.addProvider("https", new org.apache.commons.vfs2.provider.https.HttpsFileProvider());
-    vfs.addProvider("ftp", new org.apache.commons.vfs2.provider.ftp.FtpFileProvider());
-    vfs.addProvider("ftps", new org.apache.commons.vfs2.provider.ftps.FtpsFileProvider());
-    vfs.addProvider("war", new org.apache.commons.vfs2.provider.jar.JarFileProvider());
-    vfs.addProvider("par", new org.apache.commons.vfs2.provider.jar.JarFileProvider());
-    vfs.addProvider("ear", new org.apache.commons.vfs2.provider.jar.JarFileProvider());
-    vfs.addProvider("sar", new org.apache.commons.vfs2.provider.jar.JarFileProvider());
-    vfs.addProvider("ejb3", new org.apache.commons.vfs2.provider.jar.JarFileProvider());
-    vfs.addProvider("tmp", new org.apache.commons.vfs2.provider.temp.TemporaryFileProvider());
-    vfs.addProvider("tar", new org.apache.commons.vfs2.provider.tar.TarFileProvider());
-    vfs.addProvider("tbz2", new org.apache.commons.vfs2.provider.tar.TarFileProvider());
-    vfs.addProvider("tgz", new org.apache.commons.vfs2.provider.tar.TarFileProvider());
-    vfs.addProvider("bz2", new org.apache.commons.vfs2.provider.bzip2.Bzip2FileProvider());
-    vfs.addProvider("hdfs", new HdfsFileProvider());
-    vfs.addExtensionMap("jar", "jar");
-    vfs.addExtensionMap("zip", "zip");
-    vfs.addExtensionMap("gz", "gz");
-    vfs.addExtensionMap("tar", "tar");
-    vfs.addExtensionMap("tbz2", "tar");
-    vfs.addExtensionMap("tgz", "tar");
-    vfs.addExtensionMap("bz2", "bz2");
-    vfs.addMimeTypeMap("application/x-tar", "tar");
-    vfs.addMimeTypeMap("application/x-gzip", "gz");
-    vfs.addMimeTypeMap("application/zip", "zip");
-    vfs.setFileContentInfoFactory(new FileContentInfoFilenameFactory());
-    vfs.setFilesCache(new SoftRefFilesCache());
-    vfs.setReplicator(new UniqueFileReplicator(new File(System.getProperty("java.io.tmpdir"))));
-    vfs.setCacheStrategy(CacheStrategy.ON_RESOLVE);
-    vfs.init();
-    return vfs;
-  }
-
-  /**
-   * Create a classloader backed by a virtual filesystem which can handle the following URI types:
-   * * res - resource files
-   * * jar
-   * * tar
-   * * bz2
-   * * tgz
-   * * zip
-   * * HDFS
-   * * FTP
-   * * HTTP/S
-   * * file
-   * @param paths A set of comma separated paths.  The paths are URIs or URIs with a regex pattern at the end.
-   * @return A classloader object if it can create it
-   * @throws FileSystemException
-   */
-  public static Optional<ClassLoader> configureClassloader(String paths) throws FileSystemException {
-    if(paths.trim().isEmpty()) {
-      return Optional.empty();
-    }
-    FileSystemManager vfs = generateVfs();
-    FileObject[] objects = resolve(vfs, paths);
-    if(objects == null || objects.length == 0) {
-      return Optional.empty();
-    }
-    return Optional.of(new VFSClassLoader(objects, vfs, vfs.getClass().getClassLoader()));
-  }
-
-  /**
-   * Resolve a set of URIs into FileObject objects.
-   * This is not recursive. The URIs can refer directly to a file or directory or an optional regex at the end.
-   * (NOTE: This is NOT a glob).
-   * @param vfs The file system manager to use to resolve URIs
-   * @param uris comma separated URIs and URI + globs
-   * @return
-   * @throws FileSystemException
-   */
-  static FileObject[] resolve(FileSystemManager vfs, String uris) throws FileSystemException {
-    if (uris == null) {
-      return new FileObject[0];
-    }
-
-    ArrayList<FileObject> classpath = new ArrayList<>();
-    for (String path : uris.split(",")) {
-      path = path.trim();
-      if (path.equals("")) {
-        continue;
-      }
-      FileObject fo = vfs.resolveFile(path);
-      switch (fo.getType()) {
-        case FILE:
-        case FOLDER:
-          classpath.add(fo);
-          break;
-        case IMAGINARY:
-          // assume its a pattern
-          String pattern = fo.getName().getBaseName();
-          if (fo.getParent() != null && fo.getParent().getType() == FileType.FOLDER) {
-            FileObject[] children = fo.getParent().getChildren();
-            for (FileObject child : children) {
-              if (child.getType() == FileType.FILE && child.getName().getBaseName().matches(pattern)) {
-                classpath.add(child);
-              }
-            }
-          } else {
-            LOG.warn("ignoring classpath entry " + fo);
-          }
-          break;
-        default:
-          LOG.warn("ignoring classpath entry " + fo);
-          break;
-      }
-    }
-    return classpath.toArray(new FileObject[classpath.size()]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/main/scripts/stellar
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/scripts/stellar b/metron-platform/metron-common/src/main/scripts/stellar
index 2145350..72d3f74 100644
--- a/metron-platform/metron-common/src/main/scripts/stellar
+++ b/metron-platform/metron-common/src/main/scripts/stellar
@@ -31,4 +31,4 @@ fi
 export HBASE_CONFIGS=/etc/hbase/conf
 export METRON_VERSION=${project.version}
 export METRON_HOME=/usr/metron/$METRON_VERSION
-java $JVMFLAGS -cp "$HBASE_CONFIGS:$METRON_HOME/lib/*" org.apache.metron.common.stellar.shell.StellarShell "$@"
+java $JVMFLAGS -cp "$HBASE_CONFIGS:$METRON_HOME/lib/*" org.apache.metron.stellar.stellar.shell.StellarShell "$@"

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/ConversionFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/ConversionFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/ConversionFunctionsTest.java
deleted file mode 100644
index b82ee49..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/ConversionFunctionsTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.dsl.functions;
-
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-public class ConversionFunctionsTest {
-
-  @Test
-  public void conversionFunctionsShouldProperlyConvertToSpecificType() throws Exception {
-    assertEquals(1D, new ConversionFunctions.TO_DOUBLE().apply(Collections.singletonList(1)));
-    assertEquals(1F, new ConversionFunctions.TO_FLOAT().apply(Collections.singletonList(1.0D)));
-    assertEquals(1, new ConversionFunctions.TO_INTEGER().apply(Collections.singletonList(1.0D)));
-    assertEquals(1L, new ConversionFunctions.TO_LONG().apply(Collections.singletonList(1F)));
-  }
-
-  @Test
-  public void conversionFunctionsShouldProperlyHandleNull() throws Exception {
-    assertEquals(null, new ConversionFunctions.TO_DOUBLE().apply(Collections.singletonList(null)));
-    assertEquals(null, new ConversionFunctions.TO_FLOAT().apply(Collections.singletonList(null)));
-    assertEquals(null, new ConversionFunctions.TO_INTEGER().apply(Collections.singletonList(null)));
-    assertEquals(null, new ConversionFunctions.TO_LONG().apply(Collections.singletonList(null)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/DataStructureFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/DataStructureFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/DataStructureFunctionsTest.java
deleted file mode 100644
index 4426608..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/DataStructureFunctionsTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.dsl.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.metron.common.utils.StellarProcessorUtils.run;
-
-public class DataStructureFunctionsTest {
-
-  @Test
-  public void is_empty_handles_happy_path() {
-    DataStructureFunctions.IsEmpty isEmpty = new DataStructureFunctions.IsEmpty();
-    {
-      boolean empty = (boolean) isEmpty.apply(ImmutableList.of("hello"));
-      Assert.assertThat("should be false", empty, CoreMatchers.equalTo(false));
-    }
-    {
-      boolean empty = (boolean) isEmpty.apply(ImmutableList.of(ImmutableList.of("hello", "world")));
-      Assert.assertThat("should be false", empty, CoreMatchers.equalTo(false));
-    }
-    {
-      boolean empty = (boolean) isEmpty.apply(ImmutableList.of(1));
-      Assert.assertThat("should be false", empty, CoreMatchers.equalTo(false));
-    }
-  }
-
-  @Test
-  public void is_empty_handles_empty_values() {
-    DataStructureFunctions.IsEmpty isEmpty = new DataStructureFunctions.IsEmpty();
-    {
-      boolean empty = (boolean) isEmpty.apply(ImmutableList.of());
-      Assert.assertThat("should be true", empty, CoreMatchers.equalTo(true));
-    }
-    {
-      boolean empty = (boolean) isEmpty.apply(null);
-      Assert.assertThat("should be true", empty, CoreMatchers.equalTo(true));
-    }
-    {
-      boolean empty = (boolean) isEmpty.apply(ImmutableList.of(""));
-      Assert.assertThat("should be true", empty, CoreMatchers.equalTo(true));
-    }
-  }
-
-  @Test
-  public void listAdd_number() {
-    for(String expr : ImmutableList.of("LIST_ADD(my_list, 1)"
-                                      ,"LIST_ADD([], 1)"
-                                      ,"LIST_ADD([], val)"
-                                      )
-       )
-    {
-      Object o = run(expr, ImmutableMap.of("my_list", new ArrayList<>(), "val", 1));
-      Assert.assertTrue(o instanceof List);
-      List<Number> result = (List<Number>) o;
-      Assert.assertEquals(1, result.size());
-      Assert.assertEquals(1, result.get(0));
-    }
-  }
-
-  @Test
-  public void listAdd_mixed() {
-    for(String expr : ImmutableList.of("LIST_ADD(my_list, 1)"
-                                      ,"LIST_ADD(['foo'], 1)"
-                                      ,"LIST_ADD(['foo'], val)"
-                                      )
-       )
-    {
-      ArrayList<Object> list = new ArrayList<>();
-      list.add("foo");
-      Object o = run(expr, ImmutableMap.of("my_list", list, "val", 1));
-      Assert.assertTrue(o instanceof List);
-      List<Object> result = (List<Object>) o;
-      Assert.assertEquals(2, result.size());
-      Assert.assertEquals("foo", result.get(0));
-      Assert.assertEquals(1, result.get(1));
-    }
-  }
-
-  @Test
-  public void listAdd_number_nonempty() {
-    for(String expr : ImmutableList.of("LIST_ADD(my_list, 2)"
-                                      ,"LIST_ADD([1], 2)"
-                                      ,"LIST_ADD([1], val)"
-                                      )
-       )
-    {
-      ArrayList<Integer> list = new ArrayList<>();
-      list.add(1);
-      Object o = run(expr, ImmutableMap.of("my_list", list, "val", 2));
-      Assert.assertTrue(o instanceof List);
-      List<Number> result = (List<Number>) o;
-      Assert.assertEquals(2, result.size());
-      Assert.assertEquals(1, result.get(0));
-      Assert.assertEquals(2, result.get(1));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/a5b13777/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/FunctionalFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/FunctionalFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/FunctionalFunctionsTest.java
deleted file mode 100644
index 81a9adc..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/dsl/functions/FunctionalFunctionsTest.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.dsl.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-import static org.apache.metron.common.utils.StellarProcessorUtils.run;
-
-public class FunctionalFunctionsTest {
-
-  @Test
-  public void testRecursive() {
-    for (String expr : ImmutableList.of( "MAP(list, inner_list -> REDUCE(inner_list, (x, y) -> x + y, 0) )"
-                                       , "MAP(list, (inner_list) -> REDUCE(inner_list, (x, y) -> x + y, 0) )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("list", ImmutableList.of(ImmutableList.of(1, 2, 3), ImmutableList.of(4, 5, 6))));
-      Assert.assertTrue(o instanceof List);
-      List<Number> result = (List<Number>) o;
-      Assert.assertEquals(2, result.size());
-      Assert.assertEquals(6, result.get(0));
-      Assert.assertEquals(15, result.get(1));
-    }
-  }
-
-  @Test
-  public void testMap_null() {
-    for (String expr : ImmutableList.of( "MAP([ 1, 2, null], x -> if x == null then 0 else 2*x )"
-                                       , "MAP([ 1, 2, null], x -> x == null ? 0 : 2*x )"
-                                       , "MAP([ 1, foo, baz], x -> x == null ? 0 : 2*x )"
-    )
-            )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", 2, "bar", 3));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(3, result.size());
-      Assert.assertEquals(2, result.get(0));
-      Assert.assertEquals(4, result.get(1));
-      Assert.assertEquals(0, result.get(2));
-    }
-  }
-
-
-  @Test
-  public void testMap() {
-    for (String expr : ImmutableList.of( "MAP([ 'foo', 'bar'], (x) -> TO_UPPER(x) )"
-                                       , "MAP([ foo, 'bar'], (x) -> TO_UPPER(x) )"
-                                       , "MAP([ foo, bar], (x) -> TO_UPPER(x) )"
-                                       , "MAP([ foo, bar], x -> TO_UPPER(x) )"
-                                       , "MAP([ foo, bar], x -> true?TO_UPPER(x):THROW('error') )"
-                                       , "MAP([ foo, bar], x -> false?THROW('error'):TO_UPPER(x) )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(2, result.size());
-      Assert.assertEquals("FOO", result.get(0));
-      Assert.assertEquals("BAR", result.get(1));
-    }
-  }
-
-
-  @Test
-  public void testMap_conditional() {
-    for (String expr : ImmutableList.of("MAP([ 'foo', 'bar'], (item) -> item == 'foo' )"
-                                       ,"MAP([ foo, bar], (item) -> item == 'foo' )"
-                                       ,"MAP([ foo, bar], (item) -> item == foo )"
-                                       ,"MAP([ foo, bar], item -> item == foo )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<Boolean> result = (List<Boolean>) o;
-      Assert.assertEquals(2, result.size());
-      Assert.assertEquals(true, result.get(0));
-      Assert.assertEquals(false, result.get(1));
-    }
-  }
-
-  @Test
-  public void testFilter() {
-    for (String expr : ImmutableList.of("FILTER([ 'foo', 'bar'], (item) -> item == 'foo' )"
-                                       ,"FILTER([ 'foo', bar], (item) -> item == 'foo' )"
-                                       ,"FILTER([ foo, bar], (item) -> item == 'foo' )"
-                                       ,"FILTER([ foo, bar], (item) -> (item == 'foo' && true) )"
-                                       ,"FILTER([ foo, bar], (item) -> if item == 'foo' then true else false )"
-                                       ,"FILTER([ foo, bar], item -> if item == 'foo' then true else false )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(1, result.size());
-      Assert.assertEquals("foo", result.get(0));
-    }
-  }
-
-
-  @Test
-  public void testFilter_shortcircuit() {
-    for (String expr : ImmutableList.of("FILTER([ 'foo'], item -> item == 'foo' or THROW('exception') )"
-                                       ,"FILTER([ 'foo'], (item) -> item == 'foo' or THROW('exception') )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(1, result.size());
-      Assert.assertEquals("foo", result.get(0));
-    }
-  }
-
-  @Test
-  public void testFilter_null() {
-    for (String expr : ImmutableList.of("FILTER([ 'foo', null], item -> item == null )"
-                                       ,"FILTER([ 'foo', baz], (item) -> item == null )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(1, result.size());
-      Assert.assertEquals(null, result.get(0));
-    }
-  }
-
-  @Test
-  public void testFilter_notnull() {
-    for (String expr : ImmutableList.of("FILTER([ 'foo', null], item -> item != null )"
-                                       ,"FILTER([ 'foo', baz], (item) -> item != null )"
-                                       ,"FILTER([ foo, baz], (item) -> item != null )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(1, result.size());
-      Assert.assertEquals("foo", result.get(0));
-    }
-  }
-
-  @Test
-  public void testFilter_none() {
-    for (String expr : ImmutableList.of( "FILTER([ foo, bar], () -> false  )"
-                                       , "FILTER([ 'foo', 'bar'], (item)-> false )"
-                                       ,"FILTER([ 'foo', bar], (item ) -> false )"
-                                       ,"FILTER([ foo, bar], (item) -> false )"
-                                       ,"FILTER([ foo, bar], item -> false )"
-
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(0, result.size());
-    }
-  }
-
-  @Test
-  public void testFilter_all() {
-    for (String expr : ImmutableList.of("FILTER([ 'foo', 'bar'], (item) -> true )"
-                                       ,"FILTER([ 'foo', bar], (item) -> true )"
-                                       ,"FILTER([ foo, bar], (item) -> true )"
-                                       ,"FILTER([ foo, bar], item -> true )"
-                                       ,"FILTER([ foo, bar], ()-> true )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", "foo", "bar", "bar"));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(2, result.size());
-      Assert.assertEquals("foo", result.get(0));
-      Assert.assertEquals("bar", result.get(1));
-    }
-  }
-
-  @Test
-  public void testReduce_null() {
-    for (String expr : ImmutableList.of("REDUCE([ 1, 2, 3, null], (x, y) -> if y != null then x + y else x , 0 )"
-                                       ,"REDUCE([ foo, bar, 3, baz], (sum, y) -> if y != null then sum + y else sum, 0 )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", 1, "bar", 2));
-      Assert.assertTrue(o instanceof Number);
-      Number result = (Number) o;
-      Assert.assertEquals(6, result.intValue());
-    }
-  }
-
-  @Test
-  public void testReduce() {
-    for (String expr : ImmutableList.of("REDUCE([ 1, 2, 3 ], (x, y) -> x + y , 0 )"
-                                       ,"REDUCE([ foo, bar, 3 ], (x, y) -> x + y , 0 )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", 1, "bar", 2));
-      Assert.assertTrue(o instanceof Number);
-      Number result = (Number) o;
-      Assert.assertEquals(6, result.intValue());
-    }
-  }
-
-  @Test
-  public void testReduce_on_various_list_sizes() {
-    {
-      String expr = "REDUCE([ 1, 2, 3, 4 ], (x, y) -> x + y , 0 )";
-      Object o = run(expr, ImmutableMap.of());
-      Assert.assertTrue(o instanceof Number);
-      Number result = (Number) o;
-      Assert.assertEquals(10, result.intValue());
-    }
-    {
-      String expr = "REDUCE([ 1, 2 ], (x, y) -> x + y , 0 )";
-      Object o = run(expr, ImmutableMap.of());
-      Assert.assertTrue(o instanceof Number);
-      Number result = (Number) o;
-      Assert.assertEquals(3, result.intValue());
-    }
-    {
-      String expr = "REDUCE([ 1 ], (x, y) -> x + y , 0 )";
-      Object o = run(expr, ImmutableMap.of());
-      Assert.assertTrue(o instanceof Number);
-      Number result = (Number) o;
-      Assert.assertEquals(1, result.intValue());
-    }
-  }
-
-  @Test
-  public void testReduce_NonNumeric() {
-    for (String expr : ImmutableList.of("REDUCE([ 'foo', 'bar', 'grok'], (x, y) -> LIST_ADD(x, y), [] )"
-                                       )
-        )
-    {
-      Object o = run(expr, ImmutableMap.of("foo", 1, "bar", 2));
-      Assert.assertTrue(o instanceof List);
-      List<String> result = (List<String>) o;
-      Assert.assertEquals(3, result.size());
-      Assert.assertEquals("foo", result.get(0));
-      Assert.assertEquals("bar", result.get(1));
-      Assert.assertEquals("grok", result.get(2));
-    }
-  }
-
-  @Test
-  public void testReduce_returns_null_when_less_than_3_args() {
-    {
-      String expr = "REDUCE([ 1, 2, 3 ], (x, y) -> LIST_ADD(x, y))";
-      Assert.assertThat(run(expr, ImmutableMap.of()), CoreMatchers.equalTo(null));
-    }
-    {
-      String expr = "REDUCE([ 1, 2, 3 ])";
-      Assert.assertThat(run(expr, ImmutableMap.of()), CoreMatchers.equalTo(null));
-    }
-  }
-
-}


Mime
View raw message