metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [2/2] metron git commit: METRON-1378: Create a summarizer closes apache/incubator-metron#879
Date Tue, 16 Jan 2018 17:32:04 GMT
METRON-1378: Create a summarizer closes apache/incubator-metron#879


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5d3e73ab
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5d3e73ab
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5d3e73ab

Branch: refs/heads/master
Commit: 5d3e73ab95adf0c8f49c3f821975740e365df91a
Parents: 6afd95f
Author: cstella <cestella@gmail.com>
Authored: Tue Jan 16 12:31:53 2018 -0500
Committer: cstella <cestella@gmail.com>
Committed: Tue Jan 16 12:31:53 2018 -0500

----------------------------------------------------------------------
 .../docker/rpm-docker/SPECS/metron.spec         |   1 +
 .../metron/common/utils/cli/CLIOptions.java     |  31 ++
 .../metron/common/utils/cli/OptionHandler.java  |  48 ++-
 .../metron-data-management/README.md            | 103 ++++++-
 .../metron/dataloads/extractor/Extractor.java   |  10 +-
 .../extractor/ExtractorCapabilities.java        |  23 ++
 .../dataloads/extractor/StatefulExtractor.java  |  31 ++
 .../TransformFilterExtractorDecorator.java      |  99 +++++--
 .../nonbulk/flatfile/CommonOptions.java         | 225 ++++++++++++++
 .../nonbulk/flatfile/ExtractorState.java        |  60 ----
 .../nonbulk/flatfile/HBaseExtractorState.java   |  66 +++++
 .../dataloads/nonbulk/flatfile/LoadOptions.java | 206 +++----------
 .../flatfile/SimpleFlatFileSummarizer.java      |  53 ++++
 .../nonbulk/flatfile/SummarizeOptions.java      | 125 ++++++++
 .../importer/AbstractLocalImporter.java         | 162 ++++++++++
 .../flatfile/importer/ImportStrategy.java       |   2 +-
 .../nonbulk/flatfile/importer/Importer.java     |   5 +-
 .../flatfile/importer/LocalImporter.java        | 161 ++++------
 .../flatfile/importer/LocalSummarizer.java      | 150 ++++++++++
 .../flatfile/importer/MapReduceImporter.java    |   2 +-
 .../nonbulk/flatfile/importer/Summarizers.java  |  46 +++
 .../nonbulk/flatfile/writer/ConsoleWriter.java  |  41 +++
 .../nonbulk/flatfile/writer/HDFSWriter.java     |  50 ++++
 .../flatfile/writer/InvalidWriterOutput.java    |  28 ++
 .../nonbulk/flatfile/writer/LocalWriter.java    |  53 ++++
 .../nonbulk/flatfile/writer/Writer.java         |  34 +++
 .../nonbulk/flatfile/writer/Writers.java        |  56 ++++
 .../src/main/scripts/flatfile_summarizer.sh     |  51 ++++
 .../flatfile/SimpleFlatFileSummarizerTest.java  | 293 +++++++++++++++++++
 .../stellar/dsl/functions/NetworkFunctions.java |   9 +-
 30 files changed, 1860 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 82e0a23..6fd2f5a 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -201,6 +201,7 @@ This package installs the Metron Parser files
 %{metron_home}/bin/Whois_CSV_to_JSON.py
 %{metron_home}/bin/geo_enrichment_load.sh
 %{metron_home}/bin/flatfile_loader.sh
+%{metron_home}/bin/flatfile_summarizer.sh
 %{metron_home}/bin/prune_elasticsearch_indices.sh
 %{metron_home}/bin/prune_hdfs_files.sh
 %{metron_home}/bin/threatintel_bulk_prune.sh

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java
new file mode 100644
index 0000000..c43b30b
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/CLIOptions.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+public interface CLIOptions<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> {
+  Option getOption();
+
+  boolean has(CommandLine cli);
+
+  String get(CommandLine cli);
+
+  OptionHandler<OPT_T> getHandler();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
index 85e7520..6dfebb8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
@@ -18,14 +18,56 @@
 package org.apache.metron.common.utils.cli;
 
 import com.google.common.base.Function;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
 
+import java.util.EnumMap;
 import java.util.Optional;
 
-public abstract class OptionHandler<OPT_T extends Enum<OPT_T>> implements Function<String, Option>
+public abstract class OptionHandler<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> implements Function<String, Option>
 {
   public Optional<Object> getValue(OPT_T option, CommandLine cli) {
     return Optional.empty();
   }
+
+  public abstract String getShortCode();
+
+  public static Options getOptions(CLIOptions[] values) {
+    Options ret = new Options();
+    for(CLIOptions o : values) {
+      ret.addOption(o.getOption());
+    }
+    return ret;
+  }
+
+  public static void printHelp(String name, CLIOptions[] values) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp( name, getOptions(values));
+  }
+
+  public static <OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>>
+  EnumMap<OPT_T, Optional<Object> > createConfig(CommandLine cli, OPT_T[] values, Class<OPT_T> clazz) {
+    EnumMap<OPT_T, Optional<Object> > ret = new EnumMap<>(clazz);
+    for(OPT_T option : values) {
+      ret.put(option, option.getHandler().getValue(option, cli));
+    }
+    return ret;
+  }
+
+  public static CommandLine parse(String name, CommandLineParser parser, String[] args, CLIOptions[] values, CLIOptions helpOption) {
+    try {
+      CommandLine cli = parser.parse(getOptions(values), args);
+      if(helpOption.has(cli)) {
+        printHelp(name, values);
+        System.exit(0);
+      }
+      return cli;
+    } catch (ParseException e) {
+      System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+      e.printStackTrace(System.err);
+      printHelp(name, values);
+      System.exit(-1);
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index f284dab..1f8e52d 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -193,12 +193,15 @@ As an example, we will be providing a CSV list of top domains as an enrichment a
 
 There are 2 property maps that work with full Stellar expressions, and 2 properties that will work with Stellar predicates.
 
-| Property            | Description
-|---------------------|---
-| value_transform     | Transform fields defined in the "columns" mapping with Stellar transformations. New keys introduced in the transform will be added to the key metadata.
-| value_filter        | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose domain property is empty after removing the TLD will be omitted.
-| indicator_transform | Transform the indicator column independent of the value transformations. You can refer to the original indicator value by using "indicator" as the variable name, as shown in the example above. In addition, if you prefer to piggyback your transformations, you can refer to the variable "domain", which will allow your indicator transforms to inherit transformations done to this value during the value transformations.
-| indicator_filter    | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose indicator value is empty after removing the TLD will be omitted. 
+| Property             | Description
+|----------------------|---
+| `value_transform`    | Transform fields defined in the "columns" mapping with Stellar transformations. New keys introduced in the transform will be added to the key metadata.
+| `value_filter`       | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose domain property is empty after removing the TLD will be omitted.
+| `indicator_transform`| Transform the indicator column independent of the value transformations. You can refer to the original indicator value by using "indicator" as the variable name, as shown in the example above. In addition, if you prefer to piggyback your transformations, you can refer to the variable "domain", which will allow your indicator transforms to inherit transformations done to this value during the value transformations.
+| `indicator_filter`   | Allows additional filtering with Stellar predicates based on results from the value transformations. In this example, records whose indicator value is empty after removing the TLD will be omitted.
+| `state_init`         | Allows a state object to be initialized.  This is a string, so a single expression is created.  The output of this expression will be available as the `state` variable.  This is to be used with the `flatfile_summarizer.sh` rather than the loader.
+| `state_update`       | Allows a state object to be updated.  This is a map, so you can have temporary variables here.  Note that you can reference the `state` variable from this.  This is to be used with the `flatfile_summarizer.sh` rather than the loader.
+| `state_merge`        | Allows a list of states to be merged. This is a string, so a single expression.  There is a special field called `states` available, which is a list of the states (one per thread).  This is to be used with the `flatfile_summarizer.sh` rather than the loader.
 
 top-list.csv
 ```
@@ -366,6 +369,94 @@ The parameters for the utility are as follows:
 | -t         | --tmp_dir           | No           | Directory for landing the temporary GeoIP data - defaults to /tmp                                |
 | -z         | --zk_quorum         | Yes          | Zookeeper Quorum URL (zk1:port,zk2:port,...)                                                     |
 
+### Flatfile Summarizer
+
+The shell script `$METRON_HOME/bin/flatfile_summarizer.sh` will read data from local disk, HDFS or URLs and generate a summary object.
+The object will be serialized and written to disk, either HDFS or local disk depending on the output mode specified.
+
+It should be noted that this utility uses the same extractor config as the `flatfile_loader.sh`,
+but as the output target is not a key value store (but rather a summary object), it is not necessary
+to specify certain configs:
+* `indicator`, `indicator_filter` and `indicator_transform` are not required, but will be executed if present.
+As in the loader, there will be an indicator field available if you so specify it (by using `indicator` in the config).
+* `type` is neither required nor used
+
+Indeed, some new configs are expected:
+* `state_init` : Executed once to initialize the state object (the object written out).
+* `state_update`: Called once per message.  The fields available are the fields for the row as well as
+  * `indicator` - the indicator value if you've specified it in the config
+  * `state` - the current state.  Useful for adding to the state (e.g. `BLOOM_ADD(state, val)` where `val` is the name of a field).
+* `state_merge` : If you are running this multi-threaded and your objects can be merged, this is the statement that will
+merge the state objects created per thread.  There is a special field available to this config:
+  * `states` - a list of the state objects
+
+One special thing to note here is that there is a special configuration
+parameter to the Extractor config that is only considered during this
+loader:
+* inputFormat : This specifies how to consider the data.  The two implementations are `BY_LINE` and `WHOLE_FILE`.
+
+The default is `BY_LINE`, which makes sense for a list of CSVs where
+each line indicates a unit of information which can be imported.
+However, if you are importing a set of STIX documents, then you want
+each document to be considered as input to the Extractor.
+
+#### Example
+
+Consider the possibility that you want to generate a bloom filter with all of the domains in a CSV structured similarly to
+the Alexa top 1M domains, so the columns are:
+* rank
+* domain name
+
+You want to generate a bloom filter with just the domains, not considering the TLD.
+You would execute the following to:
+* read data from `./top-1m.csv`
+* write data to `./filter.ser`
+* use 5 threads
+
+```
+$METRON_HOME/bin/flatfile_summarizer.sh -i ./top-1m.csv -o ./filter.ser -e ./extractor.json -p 5 -b 128
+```
+
+To configure this, `extractor.json` would look like:
+```
+{
+  "config" : {
+    "columns" : {
+      "rank" : 0,
+      "domain" : 1
+    },
+    "value_transform" : {
+      "domain" : "DOMAIN_REMOVE_TLD(domain)"
+    },
+    "value_filter" : "LENGTH(domain) > 0",
+    "state_init" : "BLOOM_INIT()",
+    "state_update" : {
+      "state" : "BLOOM_ADD(state, domain)"
+    },
+    "state_merge" : "BLOOM_MERGE(states)",
+    "separator" : ","
+  },
+  "extractor" : "CSV"
+}
+```
+
+#### Parameters
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code           | Is Required? | Description                                                                                                                                                                         |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h         |                     | No           | Generate the help screen/set of options                                                                                                                                             |
+| -q         | --quiet             | No           | Do not update progress                                                                                                                                                              |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                   |
+| -m         | --import_mode       | No           | The Import mode to use: LOCAL, MR.  Default: LOCAL                                                                                                                                  |
+| -om        | --output_mode       | No           | The Output mode to use: LOCAL, HDFS.  Default: LOCAL                                                                                                                                  |
+| -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory.  |
+| -o         | --output            | Yes          | The output data location.    |
+| -l         | --log4j             | No           | The log4j properties file to load                                                                                                                                                   |
+| -p         | --threads           | No           | The number of threads to use when extracting data.  The default is the number of cores.                                                                                             |
+| -b         | --batchSize         | No           | The batch size to use for HBase puts                                                                                                                                                |
+
 ## Pruning Data from Elasticsearch
 
 **Note** - As of the Metron upgrade from Elasticsearch 2.3.3 to 5.6.2, the included Data Pruner is no longer supported. It is replaced in favor of the Curator utility

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
index bd490c8..7fd2741 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/Extractor.java
@@ -20,9 +20,15 @@ package org.apache.metron.dataloads.extractor;
 import org.apache.metron.enrichment.lookup.LookupKV;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Map;
+import java.util.Set;
 
 public interface Extractor {
-    Iterable<LookupKV> extract(String line) throws IOException;
-    void initialize(Map<String, Object> config);
+  Iterable<LookupKV> extract(String line) throws IOException;
+  void initialize(Map<String, Object> config);
+  default Set<ExtractorCapabilities> getCapabilities() {
+    return EnumSet.noneOf(ExtractorCapabilities.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java
new file mode 100644
index 0000000..91c6232
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorCapabilities.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+public enum ExtractorCapabilities {
+  STATEFUL,
+  MERGEABLE;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java
new file mode 100644
index 0000000..df2334d
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/StatefulExtractor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public interface StatefulExtractor extends Extractor {
+  Object initializeState(Map<String, Object> config);
+  Object mergeStates(List<? extends Object> states);
+  Iterable<LookupKV> extract(String line, AtomicReference<Object> state) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
index 790ea9f..c47dfc6 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java
@@ -17,22 +17,15 @@
  */
 package org.apache.metron.dataloads.extractor;
 
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR_FILTER;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.INDICATOR_TRANSFORM;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.VALUE_FILTER;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.VALUE_TRANSFORM;
-import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.ZK_QUORUM;
-
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.lang.ref.Reference;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
@@ -47,8 +40,13 @@ import org.apache.metron.enrichment.lookup.LookupKV;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TransformFilterExtractorDecorator extends ExtractorDecorator {
+import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
+
+public class TransformFilterExtractorDecorator extends ExtractorDecorator implements StatefulExtractor {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String STATE_KEY = "state";
+  public static final String STATES_KEY = "states";
+
 
   protected enum ExtractorOptions {
     VALUE_TRANSFORM("value_transform"),
@@ -56,7 +54,10 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
     INDICATOR_TRANSFORM("indicator_transform"),
     INDICATOR_FILTER("indicator_filter"),
     ZK_QUORUM("zk_quorum"),
-    INDICATOR("indicator");
+    INDICATOR("indicator"),
+    STATE_INIT("state_init"),
+    STATE_UPDATE("state_update"),
+    STATE_MERGE("state_merge");
 
     private String key;
 
@@ -83,6 +84,9 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
   private StellarProcessor transformProcessor;
   private StellarPredicateProcessor filterProcessor;
   private Map<String, Object> globalConfig;
+  private Map<String, String> stateUpdate;
+  private String stateMerge;
+  private Set<ExtractorCapabilities> capabilities;
 
   public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
     super(decoratedExtractor);
@@ -91,8 +95,11 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
     this.indicatorTransforms = new LinkedHashMap<>();
     this.valueFilter = "";
     this.indicatorFilter = "";
+    this.stateUpdate = new LinkedHashMap<>();
+    this.stateMerge = "";
     this.transformProcessor = new StellarProcessor();
     this.filterProcessor = new StellarPredicateProcessor();
+    this.capabilities = EnumSet.noneOf(ExtractorCapabilities.class);
   }
 
   @Override
@@ -110,6 +117,17 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
     if (INDICATOR_FILTER.existsIn(config)) {
       this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
     }
+    if (STATE_UPDATE.existsIn(config)) {
+      capabilities.add(ExtractorCapabilities.STATEFUL);
+      this.stateUpdate = getTransforms(config, STATE_UPDATE.toString());
+    }
+    if(STATE_INIT.existsIn(config)) {
+      capabilities.add(ExtractorCapabilities.STATEFUL);
+    }
+    if (STATE_MERGE.existsIn(config)) {
+      capabilities.add(ExtractorCapabilities.MERGEABLE);
+      this.stateMerge = getFilter(config, STATE_MERGE.toString());
+    }
     String zkClientUrl = "";
     if (ZK_QUORUM.existsIn(config)) {
       zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
@@ -120,6 +138,29 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
     StellarFunctions.initialize(stellarContext);
     this.transformProcessor = new StellarProcessor();
     this.filterProcessor = new StellarPredicateProcessor();
+
+  }
+
+  @Override
+  public Object initializeState(Map<String, Object> config) {
+    if(STATE_INIT.existsIn(config)) {
+      MapVariableResolver resolver = new MapVariableResolver(globalConfig);
+      return transformProcessor.parse( config.get(STATE_INIT.toString()).toString()
+                                      , resolver
+                                      , StellarFunctions.FUNCTION_RESOLVER()
+                                      , stellarContext
+                                      );
+    }
+    return null;
+  }
+
+  @Override
+  public Object mergeStates(List<? extends Object> states) {
+    return transformProcessor.parse( stateMerge
+                            , new MapVariableResolver(new HashMap<String, Object>() {{ put(STATES_KEY, states); }}, globalConfig)
+                            , StellarFunctions.FUNCTION_RESOLVER()
+                            , stellarContext
+                            );
   }
 
   private String getFilter(Map<String, Object> config, String valueFilter) {
@@ -187,10 +228,20 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
   }
 
   @Override
+  public Set<ExtractorCapabilities> getCapabilities() {
+    return capabilities;
+  }
+
+  @Override
   public Iterable<LookupKV> extract(String line) throws IOException {
+    return extract(line, new AtomicReference<>(null));
+  }
+
+  @Override
+  public Iterable<LookupKV> extract(String line, AtomicReference<Object> state) throws IOException {
     List<LookupKV> lkvs = new ArrayList<>();
     for (LookupKV lkv : super.extract(line)) {
-      if (updateLookupKV(lkv)) {
+      if (updateLookupKV(lkv, state)) {
         lkvs.add(lkv);
       }
     }
@@ -202,23 +253,30 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
    * @param lkv LookupKV to transform and filter
    * @return true if lkv is not null after transform/filter
    */
-  private boolean updateLookupKV(LookupKV lkv) {
+  private boolean updateLookupKV(LookupKV lkv, AtomicReference<Object> state) {
     Map<String, Object> ret = lkv.getValue().getMetadata();
     Map<String, Object> ind = new LinkedHashMap<>();
     String indicator = lkv.getKey().getIndicator();
     // add indicator as a resolvable variable. Also enable using resolved/transformed variables and values from operating on the value metadata
     ind.put(INDICATOR.toString(), indicator);
-    MapVariableResolver resolver = new MapVariableResolver(ret, ind, globalConfig);
+    Map<String, Object> stateMap = new LinkedHashMap<>();
+    stateMap.put(STATE_KEY, state.get());
+    MapVariableResolver resolver = new MapVariableResolver(ret, ind, globalConfig, stateMap);
     transform(valueTransforms, ret, resolver);
     transform(indicatorTransforms, ind, resolver);
     // update indicator
     Object updatedIndicator = ind.get(INDICATOR.toString());
-    if (updatedIndicator != null) {
+    if (updatedIndicator != null || getCapabilities().contains(ExtractorCapabilities.STATEFUL)) {
       if (!(updatedIndicator instanceof String)) {
         throw new UnsupportedOperationException("Indicator transform must return String type");
       }
       lkv.getKey().setIndicator((String) updatedIndicator);
-      return filter(indicatorFilter, resolver) && filter(valueFilter, resolver);
+      boolean update = filter(indicatorFilter, resolver) && filter(valueFilter, resolver);
+      if(update && !stateUpdate.isEmpty()) {
+        transform(stateUpdate, stateMap, resolver);
+        state.set(stateMap.get(STATE_KEY));
+      }
+      return update;
     } else {
       return false;
     }
@@ -236,6 +294,9 @@ public class TransformFilterExtractorDecorator extends ExtractorDecorator {
   }
 
   private Boolean filter(String filterPredicate, MapVariableResolver variableResolver) {
+    if(StringUtils.isEmpty(filterPredicate)) {
+      return true;
+    }
     return filterProcessor.parse(filterPredicate, variableResolver, StellarFunctions.FUNCTION_RESOLVER(), stellarContext);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java
new file mode 100644
index 0000000..a55d69d
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/CommonOptions.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.common.utils.cli.OptionHandler;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+public class CommonOptions {
+  public static class Help<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Override
+    public String getShortCode() {
+      return "h";
+    }
+
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String input) {
+      return new Option(getShortCode(), "help", false, "Generate Help screen");
+    }
+  }
+
+  public static class Quiet<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Override
+    public String getShortCode() {
+      return "q";
+    }
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String input) {
+      return new Option(getShortCode(), "quiet", false, "Do not update progress");
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      return Optional.of(option.has(cli));
+    }
+  }
+
+  public static class ImportMode<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    Object[] importModes;
+    Object defaultMode;
+    Function<String, Optional<Object>> resolver;
+    public ImportMode(Object[] importModes, Object defaultMode, Function<String, Optional<Object>> resolver) {
+      this.importModes = importModes;
+      this.defaultMode = defaultMode;
+      this.resolver = resolver;
+    }
+
+    @Override
+    public String getShortCode() {
+      return "m";
+    }
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String input) {
+      Option o = new Option(getShortCode(), "import_mode", true
+                           , "The Import mode to use: " + Joiner.on(",").join(importModes)
+                           + ".  Default: " +defaultMode
+                           );
+      o.setArgName("MODE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      String mode = option.get(cli);
+      return resolver.apply(mode);
+    }
+  }
+
+  public static class ExtractorConfig<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+      o.setArgName("JSON_FILE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      try {
+        return Optional.ofNullable(FileUtils.readFileToString(new File(option.get(cli).trim())));
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to retrieve extractor config from " + option.get(cli) + ": " + e.getMessage(), e);
+      }
+    }
+
+    @Override
+    public String getShortCode() {
+      return "e";
+    }
+  }
+
+
+  public static class Log4jProperties<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+      o.setArgName("FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public String getShortCode() {
+      return "l";
+    }
+  }
+
+
+  public static class NumThreads<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
+      o.setArgName("NUM_THREADS");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      int numThreads = Runtime.getRuntime().availableProcessors();
+      if(option.has(cli)) {
+        numThreads = ConversionUtils.convert(option.get(cli), Integer.class);
+      }
+      return Optional.of(numThreads);
+    }
+
+    @Override
+    public String getShortCode() {
+      return "p";
+    }
+  }
+
+  public static class BatchSize<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
+      o.setArgName("SIZE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      int batchSize = 128;
+      if(option.has(cli)) {
+        batchSize = ConversionUtils.convert(option.get(cli), Integer.class);
+      }
+      return Optional.of(batchSize);
+    }
+
+    @Override
+    public String getShortCode() {
+      return "b";
+    }
+  }
+
+  public static class Input<OPT_T extends Enum<OPT_T> & CLIOptions<OPT_T>> extends OptionHandler<OPT_T> {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "input", true, "The CSV File to load");
+      o.setArgName("FILE");
+      o.setRequired(true);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+      List<String> inputs = new ArrayList<>();
+      for(String input : Splitter.on(",").split(Optional.ofNullable(option.get(cli)).orElse(""))) {
+        inputs.add(input.trim());
+      }
+      return Optional.of(inputs);
+    }
+
+    @Override
+    public String getShortCode() {
+      return "i";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
deleted file mode 100644
index 168d251..0000000
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.nonbulk.flatfile;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.dataloads.extractor.Extractor;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-
-import java.io.IOException;
-
-public class ExtractorState {
-  private HTableInterface table;
-  private Extractor extractor;
-  private HbaseConverter converter;
-  private FileSystem fs;
-
-  public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter, Configuration config) {
-    this.table = table;
-    this.extractor = extractor;
-    this.converter = converter;
-    try {
-      this.fs = FileSystem.get(config);
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to retrieve hadoop file system: " + e.getMessage(), e);
-    }
-  }
-
-  public HTableInterface getTable() {
-    return table;
-  }
-
-  public Extractor getExtractor() {
-    return extractor;
-  }
-
-  public HbaseConverter getConverter() {
-    return converter;
-  }
-
-  public FileSystem getFileSystem() {
-    return fs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java
new file mode 100644
index 0000000..f0ee3ad
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/HBaseExtractorState.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+
+import java.io.IOException;
+
+public class HBaseExtractorState {
+  private HTableInterface table;
+  private Extractor extractor;
+  private HbaseConverter converter;
+  private FileSystem fs;
+  private String cf;
+
+  public HBaseExtractorState(HTableInterface table, String cf, Extractor extractor, HbaseConverter converter, Configuration config) {
+    this.table = table;
+    this.extractor = extractor;
+    this.converter = converter;
+    this.cf = cf;
+    try {
+      this.fs = FileSystem.get(config);
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to retrieve hadoop file system: " + e.getMessage(), e);
+    }
+  }
+
+  public String getCf() {
+    return cf;
+  }
+
+  public HTableInterface getTable() {
+    return table;
+  }
+
+  public Extractor getExtractor() {
+    return extractor;
+  }
+
+  public HbaseConverter getConverter() {
+    return converter;
+  }
+
+  public FileSystem getFileSystem() {
+    return fs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
index 448f406..2967c6d 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import org.apache.commons.cli.*;
 import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.cli.CLIOptions;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.common.utils.cli.OptionHandler;
 import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
@@ -33,48 +34,14 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Optional;
 
-public enum LoadOptions {
-  HELP("h", new OptionHandler<LoadOptions>() {
-
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      return new Option(s, "help", false, "Generate Help screen");
-    }
-  })
-  ,QUIET("q", new OptionHandler<LoadOptions>() {
-
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      return new Option(s, "quiet", false, "Do not update progress");
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      return Optional.of(option.has(cli));
-    }
-  })
-  , IMPORT_MODE("m", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "import_mode", true
-                           , "The Import mode to use: " + Joiner.on(",").join(ImportStrategy.values())
-                           + ".  Default: " + ImportStrategy.LOCAL
-                           );
-      o.setArgName("MODE");
-      o.setRequired(false);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      String mode = option.get(cli);
-      return Optional.of(ImportStrategy.getStrategy(mode).orElse(ImportStrategy.LOCAL));
-    }
-  })
-  ,HBASE_TABLE("t", new OptionHandler<LoadOptions>() {
+public enum LoadOptions implements CLIOptions<LoadOptions> {
+  HELP(new CommonOptions.Help<> ())
+  ,QUIET(new CommonOptions.Quiet<>())
+  , IMPORT_MODE(new CommonOptions.ImportMode<>( ImportStrategy.values()
+                                              , ImportStrategy.LOCAL
+                                              , mode -> Optional.ofNullable(ImportStrategy.getStrategy(mode).orElse(ImportStrategy.LOCAL)))
+               )
+  ,HBASE_TABLE(new OptionHandler<LoadOptions>() {
     @Nullable
     @Override
     public Option apply(@Nullable String s) {
@@ -88,8 +55,13 @@ public enum LoadOptions {
     public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
       return Optional.ofNullable(option.get(cli).trim());
     }
+
+    @Override
+    public String getShortCode() {
+      return "t";
+    }
   })
-  ,HBASE_CF("c", new OptionHandler<LoadOptions>() {
+  ,HBASE_CF(new OptionHandler<LoadOptions>() {
     @Nullable
     @Override
     public Option apply(@Nullable String s) {
@@ -103,27 +75,19 @@ public enum LoadOptions {
     public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
       return Optional.ofNullable(option.get(cli).trim());
     }
-  })
-  ,EXTRACTOR_CONFIG("e", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
-      o.setArgName("JSON_FILE");
-      o.setRequired(true);
-      return o;
-    }
 
     @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      try {
-        return Optional.ofNullable(FileUtils.readFileToString(new File(option.get(cli).trim())));
-      } catch (IOException e) {
-        throw new IllegalStateException("Unable to retrieve extractor config from " + option.get(cli) + ": " + e.getMessage(), e);
-      }
+    public String getShortCode() {
+      return "c";
     }
   })
-  ,ENRICHMENT_CONFIG("n", new OptionHandler<LoadOptions>() {
+  ,EXTRACTOR_CONFIG(new CommonOptions.ExtractorConfig<>())
+  ,ENRICHMENT_CONFIG(new OptionHandler<LoadOptions>() {
+    @Override
+    public String getShortCode() {
+      return "n";
+    }
+
     @Nullable
     @Override
     public Option apply(@Nullable String s) {
@@ -136,126 +100,46 @@ public enum LoadOptions {
       return o;
     }
   })
-  ,LOG4J_PROPERTIES("l", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "log4j", true, "The log4j properties file to load");
-      o.setArgName("FILE");
-      o.setRequired(false);
-      return o;
-    }
-  })
-  ,NUM_THREADS("p", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
-      o.setArgName("NUM_THREADS");
-      o.setRequired(false);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      int numThreads = Runtime.getRuntime().availableProcessors();
-      if(option.has(cli)) {
-        numThreads = ConversionUtils.convert(option.get(cli), Integer.class);
-      }
-      return Optional.of(numThreads);
-    }
-  })
-  ,BATCH_SIZE("b", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
-      o.setArgName("SIZE");
-      o.setRequired(false);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      int batchSize = 128;
-      if(option.has(cli)) {
-        batchSize = ConversionUtils.convert(option.get(cli), Integer.class);
-      }
-      return Optional.of(batchSize);
-    }
-  })
-  ,INPUT("i", new OptionHandler<LoadOptions>() {
-    @Nullable
-    @Override
-    public Option apply(@Nullable String s) {
-      Option o = new Option(s, "input", true, "The CSV File to load");
-      o.setArgName("FILE");
-      o.setRequired(true);
-      return o;
-    }
-
-    @Override
-    public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
-      List<String> inputs = new ArrayList<>();
-      for(String input : Splitter.on(",").split(Optional.ofNullable(option.get(cli)).orElse(""))) {
-        inputs.add(input.trim());
-      }
-      return Optional.of(inputs);
-    }
-  })
+  ,LOG4J_PROPERTIES(new CommonOptions.Log4jProperties<>())
+  ,NUM_THREADS(new CommonOptions.NumThreads<>())
+  ,BATCH_SIZE(new CommonOptions.BatchSize<>())
+  ,INPUT(new CommonOptions.Input<>())
   ;
   Option option;
   String shortCode;
   OptionHandler<LoadOptions> handler;
-  LoadOptions(String shortCode, OptionHandler<LoadOptions> optionHandler) {
-    this.shortCode = shortCode;
+  LoadOptions(OptionHandler<LoadOptions> optionHandler) {
+    this.shortCode = optionHandler.getShortCode();
     this.handler = optionHandler;
     this.option = optionHandler.apply(shortCode);
   }
 
+  @Override
+  public OptionHandler<LoadOptions> getHandler() {
+    return handler;
+  }
+
+
+  @Override
+  public Option getOption() {
+    return option;
+  }
+
+  @Override
   public boolean has(CommandLine cli) {
     return cli.hasOption(shortCode);
   }
 
+  @Override
   public String get(CommandLine cli) {
     return cli.getOptionValue(shortCode);
   }
 
   public static CommandLine parse(CommandLineParser parser, String[] args) {
-    try {
-      CommandLine cli = parser.parse(getOptions(), args);
-      if(HELP.has(cli)) {
-        printHelp();
-        System.exit(0);
-      }
-      return cli;
-    } catch (ParseException e) {
-      System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
-      e.printStackTrace(System.err);
-      printHelp();
-      System.exit(-1);
-      return null;
-    }
+    return OptionHandler.parse("SimpleEnrichmentFlatFileLoader", parser, args, values(), HELP);
   }
 
   public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) {
-    EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class);
-    for(LoadOptions option : values()) {
-      ret.put(option, option.handler.getValue(option, cli));
-    }
-    return ret;
-  }
-
-  public static void printHelp() {
-    HelpFormatter formatter = new HelpFormatter();
-    formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
-  }
-
-  public static Options getOptions() {
-    Options ret = new Options();
-    for(LoadOptions o : LoadOptions.values()) {
-      ret.addOption(o.option);
-    }
-    return ret;
+    return OptionHandler.createConfig(cli, values(), LoadOptions.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java
new file mode 100644
index 0000000..1cd1abf
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleFlatFileSummarizer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.Summarizers;
+
+import java.io.File;
+import java.util.EnumMap;
+import java.util.Optional;
+
+public class SimpleFlatFileSummarizer {
+    public static void main(String... argv) throws Exception {
+    Configuration hadoopConfig = HBaseConfiguration.create();
+    String[] otherArgs = new GenericOptionsParser(hadoopConfig, argv).getRemainingArgs();
+    main(hadoopConfig, otherArgs);
+  }
+
+  public static void main(Configuration hadoopConfig, String[] argv) throws Exception {
+    CommandLine cli = SummarizeOptions.parse(new PosixParser(), argv);
+    EnumMap<SummarizeOptions, Optional<Object>> config = SummarizeOptions.createConfig(cli);
+    if(SummarizeOptions.LOG4J_PROPERTIES.has(cli)) {
+      PropertyConfigurator.configure(SummarizeOptions.LOG4J_PROPERTIES.get(cli));
+    }
+    ExtractorHandler handler = ExtractorHandler.load(
+            FileUtils.readFileToString(new File(SummarizeOptions.EXTRACTOR_CONFIG.get(cli).trim()))
+    );
+    Summarizers strategy = (Summarizers) config.get(SummarizeOptions.IMPORT_MODE).get();
+    strategy.getSummarizer().importData(config, handler, hadoopConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java
new file mode 100644
index 0000000..4e644ba
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SummarizeOptions.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.common.utils.cli.OptionHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.Summarizers;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.Writers;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum SummarizeOptions implements CLIOptions<SummarizeOptions> {
+  HELP(new CommonOptions.Help<>())
+  ,QUIET(new CommonOptions.Quiet<>())
+  , IMPORT_MODE(new CommonOptions.ImportMode<>(Summarizers.values(), Summarizers.LOCAL, mode -> Optional.of(Summarizers.getStrategy(mode).orElse(Summarizers.LOCAL))))
+  , OUTPUT_MODE(new OptionHandler<SummarizeOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "output_mode", true
+                           , "The output mode to use: " + Joiner.on(",").join(Writers.values())
+                           + ".  Default: " + Writers.LOCAL
+                           );
+      o.setArgName("MODE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(SummarizeOptions option, CommandLine cli) {
+      String mode = option.get(cli);
+      return Optional.of(Writers.getStrategy(mode).orElse(Writers.LOCAL));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "om";
+    }
+  })
+  ,EXTRACTOR_CONFIG(new CommonOptions.ExtractorConfig<>())
+  ,LOG4J_PROPERTIES(new CommonOptions.Log4jProperties<>())
+  ,NUM_THREADS(new CommonOptions.NumThreads<>())
+  ,BATCH_SIZE(new CommonOptions.BatchSize<>())
+  ,INPUT(new CommonOptions.Input<>())
+  ,OUTPUT(new OptionHandler<SummarizeOptions>() {
+    @Nullable
+    @Override
+    public Option apply(@Nullable String s) {
+      Option o = new Option(s, "output", true, "The output file to write");
+      o.setArgName("FILE");
+      o.setRequired(false);
+      return o;
+    }
+
+    @Override
+    public Optional<Object> getValue(SummarizeOptions option, CommandLine cli) {
+      return Optional.ofNullable(option.get(cli));
+    }
+
+    @Override
+    public String getShortCode() {
+      return "o";
+    }
+  })
+  ;
+  Option option;
+  String shortCode;
+  OptionHandler<SummarizeOptions> handler;
+  SummarizeOptions(OptionHandler<SummarizeOptions> optionHandler) {
+    this.shortCode = optionHandler.getShortCode();
+    this.handler = optionHandler;
+    this.option = optionHandler.apply(shortCode);
+  }
+
+  @Override
+  public OptionHandler<SummarizeOptions> getHandler() {
+    return handler;
+  }
+
+  public Option getOption() {
+    return option;
+  }
+
+  public boolean has(CommandLine cli) {
+    return cli.hasOption(shortCode);
+  }
+
+  public String get(CommandLine cli) {
+    return cli.getOptionValue(shortCode);
+  }
+
+  public static CommandLine parse(CommandLineParser parser, String[] args) {
+    return OptionHandler.parse("SimpleFlatFileSummarizer", parser, args, values(), HELP);
+  }
+
+  public static EnumMap<SummarizeOptions, Optional<Object> > createConfig(CommandLine cli) {
+    return OptionHandler.createConfig(cli, values(), SummarizeOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java
new file mode 100644
index 0000000..1709931
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/AbstractLocalImporter.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.common.utils.file.ReaderSpliterator;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
+import org.apache.metron.common.utils.cli.CLIOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class AbstractLocalImporter<OPTIONS_T extends Enum<OPTIONS_T> & CLIOptions, STATE_T>  implements Importer<OPTIONS_T> {
+
+  @Override
+  public void importData( final EnumMap<OPTIONS_T, Optional<Object>> config
+                        , final ExtractorHandler handler
+                        , final Configuration hadoopConfig
+                         ) throws IOException, InvalidWriterOutput {
+    validateState(config, handler);
+    ThreadLocal<STATE_T> state = createState(config, hadoopConfig, handler);
+    boolean quiet = isQuiet(config);
+    boolean lineByLine = !handler.getInputFormat().getClass().equals(WholeFileFormat.class);
+    List<String> inputs = getInputs(config);
+    FileSystem fs = FileSystem.get(hadoopConfig);
+    if(!lineByLine) {
+      extractWholeFiles(inputs, fs, state, quiet);
+    }
+    else {
+      int batchSize = batchSize(config);
+      int numThreads = numThreads(config, handler);
+      extractLineByLine(inputs, fs, state, batchSize, numThreads, quiet);
+    }
+    if(!quiet) {
+      System.out.println();
+    }
+  }
+
+  protected abstract List<String> getInputs(final EnumMap<OPTIONS_T, Optional<Object>> config);
+  protected abstract boolean isQuiet(final EnumMap<OPTIONS_T, Optional<Object>> config);
+  protected abstract int batchSize(final EnumMap<OPTIONS_T, Optional<Object>> config);
+  protected abstract int numThreads(final EnumMap<OPTIONS_T, Optional<Object>> config, ExtractorHandler handler);
+
+  protected abstract void validateState(final EnumMap<OPTIONS_T, Optional<Object>> config
+                                       ,final ExtractorHandler handler
+                                       );
+
+  protected abstract ThreadLocal<STATE_T> createState( final EnumMap<OPTIONS_T, Optional<Object>> config
+                                                     , final Configuration hadoopConfig
+                                                     , final ExtractorHandler handler
+                                                     );
+
+  protected abstract void extract(STATE_T state
+                                 , String line
+                                 ) throws IOException;
+
+  protected Location resolveLocation(String input, FileSystem fs) {
+    return LocationStrategy.getLocation(input, fs);
+  }
+
+  public void extractLineByLine( List<String> inputs
+                               , FileSystem fs
+                               , ThreadLocal<STATE_T> state
+                               , int batchSize
+                               , int numThreads
+                               , boolean quiet
+                               ) throws IOException {
+    inputs.stream().map(input -> resolveLocation(input, fs))
+                   .forEach( loc -> {
+                      final Progress progress = new Progress();
+                      if(!quiet) {
+                        System.out.println("\nProcessing " + loc.toString());
+                      }
+                      try (Stream<String> stream = ReaderSpliterator.lineStream(loc.openReader(), batchSize)) {
+                        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
+                        forkJoinPool.submit(() ->
+                          stream.parallel().forEach(input ->  {
+                                    try {
+                                      extract(state.get(), input);
+                                      if (!quiet) {
+                                        progress.update();
+                                      }
+                                    }
+                                    catch(IOException e) {
+                                      throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
+                                    }
+                                  }
+                                       )
+                               ).get();
+                             } catch (Exception e) {
+                               throw new IllegalStateException(e.getMessage(), e);
+                             }
+                                  }
+                   );
+  }
+
+  public void extractWholeFiles(List<String> inputs, FileSystem fs, ThreadLocal<STATE_T> state, boolean quiet) throws IOException {
+    final Progress progress = new Progress();
+    final List<Location> locations = getLocationsRecursive(inputs, fs);
+    locations.parallelStream().forEach(loc -> {
+      try(BufferedReader br = loc.openReader()) {
+        String s = br.lines().collect(Collectors.joining());
+        extract(state.get(), s);
+        if(!quiet) {
+          progress.update();
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("Unable to read " + loc + ": " + e.getMessage(), e);
+      }
+    });
+  }
+
+  protected List<Location> getLocationsRecursive(List<String> inputs, FileSystem fs) throws IOException {
+    final List<Location> locations = new ArrayList<>();
+    Location.fileVisitor(inputs, loc -> locations.add(loc), fs);
+    return locations;
+  }
+
+  public static class Progress {
+    private int count = 0;
+    private String anim= "|/-\\";
+
+    public synchronized void update() {
+      int currentCount = count++;
+      System.out.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length()));
+    }
+  }
+
+  protected void assertOption(EnumMap<OPTIONS_T, Optional<Object>> config, OPTIONS_T option) {
+    if(!config.containsKey(option)) {
+      throw new IllegalStateException("Expected " + option.getOption().getOpt() + " to be set");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
index df88640..7730bb8 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
@@ -20,7 +20,7 @@ package org.apache.metron.dataloads.nonbulk.flatfile.importer;
 import java.util.Optional;
 
 public enum ImportStrategy {
-  LOCAL(LocalImporter.INSTANCE),
+  LOCAL(new LocalImporter()),
   MR(MapReduceImporter.INSTANCE)
   ;
   private Importer importer;

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
index 81ede08..0c7faf6 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
 import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.writer.InvalidWriterOutput;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 
 import java.io.IOException;
@@ -29,6 +30,6 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Optional;
 
-public interface Importer {
-  void importData(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException;
+public interface Importer<OPTIONS_T extends Enum<OPTIONS_T>> {
+  void importData(EnumMap<OPTIONS_T, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException, InvalidWriterOutput;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5d3e73ab/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
index 652a4c3..ec37585 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
@@ -18,17 +18,12 @@
 package org.apache.metron.dataloads.nonbulk.flatfile.importer;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.metron.common.utils.file.ReaderSpliterator;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
-import org.apache.metron.dataloads.nonbulk.flatfile.ExtractorState;
+import org.apache.metron.dataloads.nonbulk.flatfile.HBaseExtractorState;
 import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
-import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
-import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.HbaseConverter;
 import org.apache.metron.enrichment.lookup.LookupKV;
@@ -36,119 +31,82 @@ import org.apache.metron.hbase.HTableProvider;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
-public enum LocalImporter implements Importer {
-  INSTANCE;
+public class LocalImporter extends AbstractLocalImporter<LoadOptions, HBaseExtractorState> {
 
   public interface HTableProviderRetriever {
     HTableProvider retrieve();
   }
 
+  HTableProviderRetriever provider;
+
+  public LocalImporter(HTableProviderRetriever provider) {
+    this.provider = provider;
+  }
+
+  public LocalImporter() {
+    this(() -> new HTableProvider());
+  }
+
+
+  @Override
+  protected List<String> getInputs(EnumMap<LoadOptions, Optional<Object>> config) {
+    return (List<String>) config.get(LoadOptions.INPUT).get();
+  }
+
+  @Override
+  protected boolean isQuiet(EnumMap<LoadOptions, Optional<Object>> config) {
+    return (boolean) config.get(LoadOptions.QUIET).get();
+  }
 
   @Override
-  public void importData( final EnumMap<LoadOptions, Optional<Object>> config
-                        , final ExtractorHandler handler
-                        , final Configuration hadoopConfig
-                         ) throws IOException {
-    importData(config, handler, hadoopConfig, () -> new HTableProvider());
+  protected int batchSize(EnumMap<LoadOptions, Optional<Object>> config) {
+    return (int) config.get(LoadOptions.BATCH_SIZE).get();
+  }
 
+  @Override
+  protected int numThreads(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler) {
+    return (int) config.get(LoadOptions.NUM_THREADS).get();
   }
-  public void importData( final EnumMap<LoadOptions, Optional<Object>> config
-                        , final ExtractorHandler handler
-                        , final Configuration hadoopConfig
-                        , final HTableProviderRetriever provider
-                         ) throws IOException {
-    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+
+  @Override
+  protected void validateState(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler) {
+    assertOption(config, LoadOptions.HBASE_CF);
+    assertOption(config, LoadOptions.HBASE_TABLE);
+  }
+
+
+
+  @Override
+  protected ThreadLocal<HBaseExtractorState> createState(EnumMap<LoadOptions, Optional<Object>> config
+                                                   , Configuration hadoopConfig
+                                                   , final ExtractorHandler handler
+                                                   ) {
+    ThreadLocal<HBaseExtractorState> state = new ThreadLocal<HBaseExtractorState>() {
       @Override
-      protected ExtractorState initialValue() {
+      protected HBaseExtractorState initialValue() {
         try {
+          String cf = (String) config.get(LoadOptions.HBASE_CF).get();
           HTableInterface table = provider.retrieve().getTable(hadoopConfig, (String) config.get(LoadOptions.HBASE_TABLE).get());
-          return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
+          return new HBaseExtractorState(table, cf, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
         } catch (IOException e1) {
           throw new IllegalStateException("Unable to get table: " + e1);
         }
       }
     };
-    boolean quiet = (boolean) config.get(LoadOptions.QUIET).get();
-    boolean lineByLine = !handler.getInputFormat().getClass().equals(WholeFileFormat.class);
-    List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
-    String cf = (String) config.get(LoadOptions.HBASE_CF).get();
-    if(!lineByLine) {
-      extractWholeFiles(inputs, state, cf, quiet);
-    }
-    else {
-      int batchSize = (int) config.get(LoadOptions.BATCH_SIZE).get();
-      int numThreads = (int) config.get(LoadOptions.NUM_THREADS).get();
-      extractLineByLine(inputs, state, cf, batchSize, numThreads, quiet);
-    }
-
+    return state;
   }
 
-  public void extractLineByLine( List<String> inputs
-                               , ThreadLocal<ExtractorState> state
-                               , String cf
-                               , int batchSize
-                               , int numThreads
-                               , boolean quiet
-                               ) throws IOException {
-    inputs.stream().map(input -> LocationStrategy.getLocation(input, state.get().getFileSystem()))
-                   .forEach( loc -> {
-                      final Progress progress = new Progress();
-                      if(!quiet) {
-                        System.out.println("\nProcessing " + loc.toString());
-                      }
-                      try (Stream<String> stream = ReaderSpliterator.lineStream(loc.openReader(), batchSize)) {
-                        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
-                        forkJoinPool.submit(() ->
-                          stream.parallel().forEach(input -> {
-                            ExtractorState es = state.get();
-                            try {
-                              es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter(), progress, quiet));
-                            } catch (IOException e) {
-                              throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
-                            }
-                                                             }
-                                       )
-                               ).get();
-                             } catch (Exception e) {
-                               throw new IllegalStateException(e.getMessage(), e);
-                             }
-                                  }
-                   );
-  }
-
-  public void extractWholeFiles( List<String> inputs, ThreadLocal<ExtractorState> state, String cf, boolean quiet) throws IOException {
-    final Progress progress = new Progress();
-    final List<Location> locations = new ArrayList<>();
-      Location.fileVisitor(inputs, loc -> locations.add(loc), state.get().getFileSystem());
-      locations.parallelStream().forEach(loc -> {
-        try(BufferedReader br = loc.openReader()) {
-          String s = br.lines().collect(Collectors.joining());
-          state.get().getTable().put(extract( s
-                                            , state.get().getExtractor()
-                                            , cf, state.get().getConverter()
-                                            , progress
-                                            , quiet
-                                            )
-                                    );
-        } catch (IOException e) {
-          throw new IllegalStateException("Unable to read " + loc + ": " + e.getMessage(), e);
-        }
-      });
+  @Override
+  protected void extract(HBaseExtractorState state, String line) throws IOException {
+    HBaseExtractorState es = state;
+    es.getTable().put(toPut(line, es.getExtractor(), state.getCf(), es.getConverter()));
   }
 
-
-  public List<Put> extract(String line
+  public List<Put> toPut(String line
                      , Extractor extractor
                      , String cf
                      , HbaseConverter converter
-                     , final Progress progress
-                     , final boolean quiet
                      ) throws IOException
   {
     List<Put> ret = new ArrayList<>();
@@ -157,21 +115,8 @@ public enum LocalImporter implements Importer {
       Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
       ret.add(put);
     }
-    if(!quiet) {
-      progress.update();
-    }
-    return ret;
-  }
 
-
-  public static class Progress {
-    private int count = 0;
-    private String anim= "|/-\\";
-
-    public synchronized void update() {
-      int currentCount = count++;
-      System.out.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length()));
-    }
+    return ret;
   }
 
 }


Mime
View raw message