metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [34/51] [partial] incubator-metron git commit: METRON-113 Project Reorganization (merrimanr) closes apache/incubator-metron#88
Date Tue, 26 Apr 2016 14:46:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
new file mode 100644
index 0000000..a123cc3
--- /dev/null
+++ b/metron-platform/metron-data-management/README.md
@@ -0,0 +1,252 @@
+# metron-data-management
+
+This project is a collection of classes to assist with loading of
+various enrichment and threat intelligence sources into Metron.
+
+## Simple HBase Enrichments/Threat Intelligence
+
+The vast majority of enrichments and threat intelligence processing tend
+toward the following pattern:
+* Take a field
+* Look up the field in a key/value store
+* If the key exists, then either it's a threat to be alerted or it should be enriched with the value associated with the key.
+
+As such, we have created this capability as a default threat intel and enrichment adapter.  The basic primitive for simple enrichments and threat intelligence sources
+is a complex key containing the following:
+* Type : The type of threat intel or enrichment (e.g. malicious_ip)
+* Indicator : The indicator in question
+* Value : The value to associate with the type, indicator pair.  This is a JSON map.
+
+At present, all of the dataloads utilities function by converting raw data
+sources to this primitive key (type, indicator) and value to be placed in HBase.
+
+In the case of threat intel, a hit on the threat intel table will result
+in:
+* The `is_alert` field being set to `true` in the index
+* A field named `threatintels.hbaseThreatIntel.$field.$threatintel_type` is set to `alert` 
+   * `$field` is the field in the original document that was a match (e.g. `src_ip_addr`) 
+   * `$threatintel_type` is the type of threat intel imported (defined in the Extractor configuration below).
+
+In the case of simple hbase enrichment, a hit on the enrichments table
+will result in the following new field for each key in the value:`enrichments.hbaseEnrichment.$field.$enrichment_type.$key` 
+* `$field` is the field in the original document that was a match (e.g.  `src_ip_addr`)
+* `$enrichment_type` is the type of enrichment imported (defined in the Extractor configuration below).
+* `$key` is a key in the JSON map associated with the row in HBase.
+
+For instance, in the situation where we had the following very silly key/value in
+HBase in the enrichment table:
+* indicator: `127.0.0.1`
+* type : `important_addresses`
+* value: `{ "name" : "localhost", "location" : "home" }`
+
+If we had a document whose `ip_src_addr` came through with a value of
+`127.0.0.1`, we would have the following fields added to the indexed
+document:
+* `enrichments.hbaseEnrichment.ip_src_addr.important_addresses.name` : `localhost`
+* `enrichments.hbaseEnrichment.ip_src_addr.important_addresses.location` : `home`
+
+## Extractor Framework
+
+For the purpose of ingesting data of a variety of formats, we have
+created an Extractor framework which allows for common data formats to
+be interpreted as enrichment or threat intelligence sources.  The
+formats supported at present are:
+* CSV (both threat intel and enrichment)
+* STIX (threat intel only)
+* Custom (pass your own class)
+
+All of the current utilities take a JSON file to configure how to
+interpret input data.  This JSON describes the type of data and the
+schema if necessary for the data if it is not fixed (as in STIX, e.g.).
+
+### CSV Extractor
+
+Consider the following example configuration file which
+describes how to process a CSV file.
+
+````
+{
+  "config" : {
+    "columns" : {
+         "ip" : 0
+        ,"source" : 2
+    }
+    ,"indicator_column" : "ip"
+    ,"type" : "malicious_ip"
+    ,"separator" : ","
+  }
+  ,"extractor" : "CSV"
+}
+````
+
+In this example, we have instructed the extractor of the schema (i.e. the columns field), 
+two columns at the first and third position.  We have indicated that the `ip` column is the indicator type
+and that the enrichment type is named `malicious_ip`.  We have also indicated that the extractor to use is the CSV Extractor.
+The other option is the STIX extractor or a fully qualified classname for your own extractor.
+
+The meta column values will show up in the value in HBase because it is called out as a non-indicator column.  The key
+for the value will be 'meta'.  For instance, given an input string of `123.45.123.12,something,the grapevine`, the following key, value
+would be extracted:
+* Indicator : `123.45.123.12`
+* Type : `malicious_ip`
+* Value : `{ "source" : "the grapevine" }`
+
+### STIX Extractor
+
+Consider the following config for importing STIX documents.  This is a threat intelligence interchange
+format, so it is particularly relevant and attractive data to import for our purposes.  Because STIX is
+a standard format, there is no need to specify the schema or how to interpret the documents.
+
+We support a subset of STIX messages for importation:
+
+| STIX Type | Specific Type | Enrichment Type Name |
+|-----------|---------------|----------------------|
+| Address   | IPV_4_ADDR    | address:IPV_4_ADDR   |
+| Address   | IPV_6_ADDR    | address:IPV_6_ADDR   |
+| Address   | E_MAIL        | address:E_MAIL       |
+| Address   | MAC           | address:MAC          |
+| Domain    | FQDN          | domain:FQDN          |
+| Hostname  |               | hostname             |
+
+
+NOTE: The enrichment type will be used as the type above.
+
+Consider the following configuration for an Extractor
+
+````
+{
+  "config" : {
+    "stix_address_categories" : "IPV_4_ADDR"
+  }
+  ,"extractor" : "STIX"
+}
+````
+
+In here, we're configuring the STIX extractor to load from a series of STIX files, however we only want to bring in IPv4
+addresses from the set of all possible addresses.  Note that if no categories are specified for import, all are assumed.
+Also, only address and domain types allow filtering via `stix_address_categories` and `stix_domain_categories` config
+parameters.
+
+## Enrichment Config
+
+In order to automatically add new enrichment and threat intel types to existing, running enrichment topologies, you will
+need to add new fields and new types to the zookeeper configuration.  A convenience parameter has been made to assist in this
+when doing an import.  Namely, you can specify the enrichment configs and how they associate with the fields of the 
+documents flowing through the enrichment topology.
+
+Consider the following Enrichment Configuration JSON.  This one is for a threat intelligence type:
+
+````
+{
+  "zkQuorum" : "localhost:2181"
+ ,"sensorToFieldList" : {
+    "bro" : {
+           "type" : "THREAT_INTEL"
+          ,"fieldToEnrichmentTypes" : {
+             "ip_src_addr" : [ "malicious_ip" ]
+            ,"ip_dst_addr" : [ "malicious_ip" ]
+                                      }
+           }
+                        }
+}
+````
+
+We have to specify the following:
+* The zookeeper quorum which holds the cluster configuration
+* The mapping between the fields in the enriched documents and the enrichment types.
+
+This configuration allows the ingestion tools to update zookeeper post-ingestion so that the enrichment topology can take advantage
+immediately of the new type.
+
+
+## Loading Utilities
+
+The two configurations above are used in the three separate ingestion tools:
+* Taxii Loader
+* Bulk load from HDFS via MapReduce
+* Flat File ingestion
+
+### Taxii Loader
+
+The shell script `$METRON_HOME/bin/threatintel_taxii_load.sh` can be used to poll a Taxii server for STIX documents and ingest them into HBase.  
+It is quite common for this Taxii server to be an aggregation server such as Soltra Edge.
+
+In addition to the Enrichment and Extractor configs described above, this loader requires a configuration file describing the connection information
+to the Taxii server.  An illustrative example of such a configuration file is:
+
+````
+{
+   "endpoint" : "http://localhost:8282/taxii-discovery-service"
+  ,"type" : "DISCOVER"
+  ,"collection" : "guest.Abuse_ch"
+  ,"table" : "threat_intel"
+  ,"columnFamily" : "cf"
+  ,"allowedIndicatorTypes" : [ "domainname:FQDN", "address:IPV_4_ADDR" ]
+}
+````
+
+As you can see, we are specifying the following information:
+* endpoint : The URL of the endpoint
+* type : `POLL` or `DISCOVER` depending on the endpoint.
+* collection : The Taxii collection to ingest
+* table : The HBase table to import into
+* columnFamily : The column family to import into
+* allowedIndicatorTypes : an array of acceptable threat intel types (see the "Enrichment Type Name" column of the Stix table above for the possibilities).
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code                 | Is Required? | Description                                                                                                                                        |
+|------------|---------------------------|--------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h         |                           | No           | Generate the help screen/set of options                                                                                                            |
+| -e         | --extractor_config        | Yes          | JSON Document describing the extractor for this input data source                                                                                  |
+| -c         | --taxii_connection_config | Yes          | The JSON config file to configure the connection                                                                                                   |
+| -p         | --time_between_polls      | No           | The time between polling the Taxii server in milliseconds. (default: 1 hour)                                                                       |
+| -b         | --begin_time              | No           | Start time to poll the Taxii server (all data from that point will be gathered in the first pull).  The format for the date is yyyy-MM-dd HH:mm:ss |
+| -l         | --log4j                   | No           | The Log4j Properties to load                                                                                                                       |
+| -n         | --enrichment_config       | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                  |
+
+
+### Bulk Load from HDFS
+
+The shell script `$METRON_HOME/bin/threatintel_bulk_load.sh` will kick off a MR job to load data staged in HDFS into an HBase table.  Note: despite what
+the naming may suggest, this utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code           | Is Required? | Description                                                                                                       |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------|
+| -h         |                     | No           | Generate the help screen/set of options                                                                           |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                 |
+| -t         | --table             | Yes          | The HBase table to import into                                                                                    |
+| -f         | --column_family     | Yes          | The HBase table column family to import into                                                                      |
+| -i         | --input             | Yes          | The input data location on HDFS                                                                                   |
+| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified. |
+or threat intel.
+
+### Flatfile Loader
+
+The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk and load the enrichment or threat intel data into an HBase table.  
+Note: This utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
+
+One special thing to note here is that there is a special configuration
+parameter to the Extractor config that is only considered during this
+loader:
+* inputFormatHandler : This specifies how to consider the data.  The two implementations are `BY_LINE` and `org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat`.
+
+The default is `BY_LINE`, which makes sense for a list of CSVs where
+each line indicates a unit of information which can be imported.
+However, if you are importing a set of STIX documents, then you want
+each document to be considered as input to the Extractor.
+
+The parameters for the utility are as follows:
+
+| Short Code | Long Code           | Is Required? | Description                                                                                                                                                                          |
+|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| -h         |                     | No           | Generate the help screen/set of options                                                                                                                                              |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                    |
+| -t         | --hbase_table       | Yes          | The HBase table to import into                                                                                                                                                       |
+| -c         | --hbase_cf          | Yes          | The HBase table column family to import into                                                                                                                                         |
+| -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory. |
+| -l         | --log4j             | No           | The log4j properties file to load                                                                                                                                                    |
+| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                                                    |
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/pom.xml b/metron-platform/metron-data-management/pom.xml
new file mode 100644
index 0000000..6c3f866
--- /dev/null
+++ b/metron-platform/metron-data-management/pom.xml
@@ -0,0 +1,327 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+  Foundation (ASF) under one or more contributor license agreements. See the 
+  NOTICE file distributed with this work for additional information regarding 
+  copyright ownership. The ASF licenses this file to You under the Apache License, 
+  Version 2.0 (the "License"); you may not use this file except in compliance 
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+  Unless required by applicable law or agreed to in writing, software distributed 
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.1BETA</version>
+    </parent>
+    <artifactId>metron-data-management</artifactId>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <httpcore.version>4.3.2</httpcore.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_hbase_guava_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.2.11</version>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.saxon</groupId>
+            <artifactId>Saxon-HE</artifactId>
+            <version>9.5.1-5</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.xml.bind</groupId>
+                    <artifactId>jaxb-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.xml.bind</groupId>
+            <artifactId>jaxb-impl</artifactId>
+            <version>2.2.5-2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mitre</groupId>
+            <artifactId>stix</artifactId>
+            <version>1.2.0.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.xml.bind</groupId>
+                    <artifactId>jaxb-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.googlecode.disruptor</groupId>
+                    <artifactId>disruptor</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>0.1BETA</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mitre.taxii</groupId>
+            <artifactId>taxii</artifactId>
+            <version>1.1.0.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>${global_opencsv_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcore.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${global_elasticsearch_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.1.14</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${global_elasticsearch_version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>4.10.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-core</artifactId>
+            <version>4.10.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${global_hbase_version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>1.19</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-integration-test</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-easymock</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <resources>
+            <resource>
+                <directory>src</directory>
+                <excludes>
+                    <exclude>**/*.java</exclude>
+                </excludes>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <argLine>-Xmx2048m -XX:MaxPermSize=256m -XX:-UseSplitVerifier</argLine>
+                    <skip>true</skip>
+                    <trimStackTrace>false</trimStackTrace>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.1</version>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.metron.guava.dataload</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    <shadedPattern>org.apache.metron.httpcore.dataload</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>classworlds:classworlds</exclude>
+                                    <exclude>junit:junit</exclude>
+                                    <exclude>jmock:*</exclude>
+                                    <exclude>*:xml-apis</exclude>
+                                    <exclude>*slf4j*</exclude>
+                                    <exclude>org.apache.maven:lib:tests</exclude>
+                                    <exclude>log4j:log4j:jar:</exclude>
+                                    <exclude>*:hbase:*</exclude>
+                                    <exclude>org.apache.hadoop.yarn.util.package-info*</exclude>
+                                </excludes>
+                            </artifactSet>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptor>src/main/assembly/assembly.xml</descriptor>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/assembly/assembly.xml b/metron-platform/metron-data-management/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..c2c384b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/assembly/assembly.xml
@@ -0,0 +1,42 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<assembly>
+  <id>archive</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>${project.basedir}/src/main/bash</directory>
+      <outputDirectory>/bin</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+      <excludes>
+        <exclude>**/*.formatted</exclude>
+        <exclude>**/*.filtered</exclude>
+      </excludes>
+      <fileMode>0755</fileMode>
+      <lineEnding>unix</lineEnding>
+      <filtered>true</filtered>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+      <outputDirectory>/lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
+  </fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py b/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py
new file mode 100755
index 0000000..2091418
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/Whois_CSV_to_JSON.py
@@ -0,0 +1,208 @@
+#!/usr/bin/python
+
+"""
+Copyright 2014 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+import sys
+import os
+import csv
+import json
+import multiprocessing
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+
+def is_field_excluded(fieldname=None):
+    """
+    Checks to see if a field name is a member of a list of names to exclude. Modify to suit your own list.
+
+    :param fieldname: A string representing a field name
+    :return: True or False
+    """
+    import re
+
+    # List of fields names to exclude
+    excluded_fields = [
+        'Audit_auditUpdatedDate',
+        #'domainName'
+    ]
+
+    if fieldname in excluded_fields:
+        return True
+
+    # Regexes to match for exclusion
+    excluded_regexes = [
+        ['_rawText$', re.IGNORECASE],
+    ]
+
+    for regex in excluded_regexes:
+        if re.search(regex[0], fieldname, regex[1]):
+            return True
+
+    return False
+
+
+def process_csv(in_filename, out_filename):
+    """
+    Processes a CSV file of WHOIS data and converts each line to a JSON element, skipping specific fields that
+    are not deemed necessary (domainName, *_rawText, Audit_auditUpdatedDate)
+
+    :param in_filename: Input CSV filename with full path
+    :param out_filename: Output JSON filename with full path
+    :return: None
+    """
+    if out_filename:
+        out_fh = open(out_filename, 'wb')
+        logging.debug('%s: Converting %s to %s' % (multiprocessing.current_process().name, in_filename, out_filename))
+    else:
+        logging.debug('%s: Analyzing %s' % (multiprocessing.current_process().name, in_filename))
+
+    with open(in_filename, 'rb') as f:
+        reader = csv.DictReader(f, delimiter=',', quotechar='"')
+        line_num = 0
+        try:
+            for row in reader:
+                line_num += 1
+                try:
+                    if out_filename:
+                        # json conversion and output
+                        new_row = {}
+                        for field in reader.fieldnames:
+                            # fields we don't want include these + anything with rawText
+                            #if field not in ['Audit_auditUpdatedDate', 'domainName'] and not field.endswith('_rawText'):
+                            if not is_field_excluded(field):
+                                new_row[field] = row.get(field)
+                        json.dump(new_row, out_fh)
+                        out_fh.write('\n')
+                    else:
+                        # analysis .. check to be sure fileheader and csv row counts match
+                        if len(row) != len(reader.fieldnames):
+                            raise Exception('Field count mismatch: row: %s / fields: %s' % (len(row), len(reader.fieldnames)))
+                except Exception, e:
+                    logging.warn("Error with file %s, line %s: %s" % (in_filename, line_num, e))
+
+            if not out_filename:
+                logging.info('Analyzed %s: OK' % in_filename)
+        except Exception, e:
+            logging.warn(e)
+
+        out_fh.close()
+
+
+##-------------------------------------------------------------------------
+
+def process_files(source_dir, output_dir, max_processes=10, overwrite=False):
+    """
+    Generates a multiprocessing.Pool() queue with a list of input and output files to be processed with processCSV.
+    Files are added by walking the source_dir and adding any file with a CSV extension. Output is placed into a single
+    directory for processing. Output filenames are generated using the first part of the directory name so a file
+    named source_dir/com/1.csv would become outputDir/com_1.json
+
+    :param source_dir: Source directory of CSV files
+    :param output_dir: Output directory for resultant JSON files
+    :param max_processes: Maximum number of processes run
+    :return:
+    """
+    logging.info("Processing Whois files from %s" % source_dir)
+
+    if output_dir and not os.path.exists(output_dir):
+        logging.debug("Creating output directory %s" % output_dir)
+        os.makedirs(output_dir)
+
+    logging.info("Starting %s pool workers" % max_processes)
+
+    if sys.version.startswith('2.6'):
+        # no maxtaskperchild in 2.6
+        pool = multiprocessing.Pool(processes=max_processes)
+    else:
+        pool = multiprocessing.Pool(processes=max_processes, maxtasksperchild=4)
+
+    filecount = 0
+    for dirname, dirnames, filenames in os.walk(source_dir):
+        for filename in filenames:
+            if filename.endswith('.csv'):
+                # output files go to outputDir and are named using the last subdirectory from the dirname
+                if output_dir:
+                    out_filename = filename.replace('csv', 'json')
+                    out_filename = os.path.join(output_dir, '%s_%s' % (os.path.split(dirname)[-1], out_filename))
+
+                    # if file does not exist or if overwrite is true, add file process to the pool
+                    if not os.path.isfile(out_filename) or overwrite:
+                        pool.apply_async(process_csv, args=(os.path.join(dirname, filename), out_filename))
+                        filecount += 1
+                    else:
+                        logging.info("Skipping %s, %s exists and overwrite is false" % (filename, out_filename))
+                else:
+                    # no outputdir so we just analyze the files
+                    pool.apply_async(process_csv, args=(os.path.join(dirname, filename), None))
+                    filecount += 1
+
+    try:
+        pool.close()
+        logging.info("Starting activities on %s CSV files" % filecount)
+        pool.join()
+    except KeyboardInterrupt:
+        logging.info("Aborting")
+        pool.terminate()
+
+    logging.info("Completed")
+
+
+##-------------------------------------------------------------------------
+
+if __name__ == "__main__":
+
+    max_cpu = multiprocessing.cpu_count()
+
+    from optparse import OptionParser
+    parser = OptionParser()
+    parser.add_option('-s', '--source', dest='source_dir', action='store',
+                      help='Source directory to walk for CSV files')
+    parser.add_option('-o', '--output', dest='out_dir', action='store',
+                      help='Output directory for JSON files')
+    parser.add_option('-O', '--overwrite', dest='overwrite', action='store_true',
+                      help='Overwrite existing files in output directory')
+    parser.add_option('-p', '--processes', dest='max_processes', action='store', default=max_cpu, type='int',
+                      help='Max number of processes to spawn')
+    parser.add_option('-a', '--analyze', dest='analyze', action='store_true',
+                      help='Analyze CSV files for validity, no file output')
+    parser.add_option('-d', '--debug', dest='debug', action='store_true',
+                      help='Enable debug messages')
+
+    (options, args) = parser.parse_args()
+
+    if not options.source_dir:
+        logging.error("Source directory required")
+        sys.exit(-1)
+
+    if not options.out_dir or options.analyze:
+        out_dir = None
+    elif not options.out_dir:
+        logging.error("Ouput directory or analysis option required")
+        sys.exit(-1)
+    else:
+        out_dir = options.out_dir
+
+    if options.max_processes > max_cpu:
+        logging.warn('Max Processes (%s) is greater than available Processors (%s)' % (options.max_processes, max_cpu))
+
+    if options.debug:
+        # enable debug level and multiprocessing debugging
+        logging.basicConfig(level=logging.DEBUG)
+        multiprocessing.log_to_stderr(logging.DEBUG)
+
+    process_files(options.source_dir, options.out_dir, options.max_processes, options.overwrite)
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh b/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh
new file mode 100755
index 0000000..e464984
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/flatfile_loader.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh b/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh
new file mode 100644
index 0000000..aed6782
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/prune_elasticsearch_indices.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+yarn jar /usr/metron/${project.version}/lib/metron-data-management-${project.version}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner "$@"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh b/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh
new file mode 100644
index 0000000..b37e022
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/prune_hdfs_files.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+yarn jar /usr/metron/${project.version}/lib/metron-data-management-${project.version}.jar org.apache.metron.dataloads.bulk.HDFSDataPruner "$@"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh
new file mode 100755
index 0000000..2df4ee3
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_load.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh
new file mode 100755
index 0000000..c3ec233
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/threatintel_bulk_prune.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.bulk.LeastRecentlyUsedPruner -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh b/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh
new file mode 100755
index 0000000..321041a
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/bash/threatintel_taxii_load.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+  . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+  . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+CP=/usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar:/usr/metron/0.1BETA/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+HADOOP_CLASSPATH=$(echo $CP )
+for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+  if [ -f $jar ];then
+    LIBJARS="$jar,$LIBJARS"
+  fi
+done
+export HADOOP_CLASSPATH
+hadoop jar /usr/metron/0.1BETA/lib/metron-data-management-0.1BETA.jar org.apache.metron.dataloads.nonbulk.taxii.TaxiiLoader "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
new file mode 100644
index 0000000..98e3b52
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+public abstract class DataPruner {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DataPruner.class);
+    protected long firstTimeMillis;
+    protected long lastTimeMillis;
+    protected String wildCard;
+
+    public DataPruner(Date startDate, Integer numDays, String wildCard) throws StartDateException {
+
+        Date startAtMidnight = dateAtMidnight(startDate);
+        this.lastTimeMillis = startDate.getTime();
+        this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
+        this.wildCard = wildCard;
+
+        Date today = dateAtMidnight(new Date());
+
+        if (!today.after(startAtMidnight)) {
+            throw new StartDateException("Prune Start Date must be prior to today");
+        }
+    }
+
+    protected Date dateAtMidnight(Date date) {
+
+        Calendar calendar = Calendar.getInstance();
+
+        calendar.setTime(date);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        return calendar.getTime();
+
+    }
+
+
+    public abstract Long prune() throws IOException;
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
new file mode 100644
index 0000000..ddbb61b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.metron.common.configuration.Configuration;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+
+public class ElasticsearchDataPruner extends DataPruner {
+
+    private String indexPattern;
+    private SimpleDateFormat dateFormat;
+    protected Client indexClient = null;
+    protected Configuration configuration;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+    private static final String defaultDateFormat = "yyyy.MM.dd.HH";
+
+
+
+    private Predicate<String> filterWithRegex = new Predicate<String>() {
+
+        @Override
+        public boolean apply(String str) {
+
+            try {
+                String dateString = str.substring(indexPattern.length());
+                Date indexCreateDate = dateFormat.parse(dateString);
+                long indexCreatedDate = indexCreateDate.getTime();
+                if (indexCreatedDate >= firstTimeMillis && indexCreatedDate < lastTimeMillis) {
+                    return true;
+                }
+            } catch (ParseException e) {
+                LOG.error("Unable to parse date from + " + str.substring(indexPattern.length()), e);
+            }
+
+            return false;
+        }
+
+    };
+
+    public ElasticsearchDataPruner(Date startDate, Integer numDays,Configuration configuration, Client indexClient, String indexPattern) throws Exception {
+
+        super(startDate, numDays, indexPattern);
+
+        this.indexPattern = indexPattern;
+        this.dateFormat = new SimpleDateFormat(defaultDateFormat);
+        this.configuration = configuration;
+        this.indexClient = indexClient;
+
+
+    }
+
+    @Override
+    public Long prune() throws IOException {
+
+        try {
+
+            configuration.update();
+
+        }
+        catch(Exception e) {
+
+            LOG.error("Unable to update configs",e);
+
+        }
+
+        String dateString = configuration.getGlobalConfig().get("es.date.format").toString();
+
+        if( null != dateString ){
+            dateFormat = new SimpleDateFormat(dateString);
+        }
+
+        ImmutableOpenMap<String, IndexMetaData> allIndices = indexClient.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
+        Iterable indicesForDeletion = getFilteredIndices(allIndices);
+        Object[] indexArray = IteratorUtils.toArray(indicesForDeletion.iterator());
+
+        if(indexArray.length > 0) {
+            String[] indexStringArray = new String[indexArray.length];
+            System.arraycopy(indexArray, 0, indexStringArray, 0, indexArray.length);
+            deleteIndex(indexClient.admin(), indexStringArray);
+        }
+
+        return new Long(indexArray.length);
+
+    }
+
+    public Boolean deleteIndex(AdminClient adminClient, String... index) {
+
+        boolean isAcknowledged = adminClient.indices().delete(adminClient.indices().prepareDelete(index).request()).actionGet().isAcknowledged();
+        return new Boolean(isAcknowledged);
+
+    }
+
+    protected Iterable<String> getFilteredIndices(ImmutableOpenMap<String, IndexMetaData> indices) {
+
+        String[] returnedIndices = new String[indices.size()];
+        Iterator it = indices.keysIt();
+        System.arraycopy(IteratorUtils.toArray(it), 0, returnedIndices, 0, returnedIndices.length);
+        Iterable<String> matches = Iterables.filter(Arrays.asList(returnedIndices), filterWithRegex);
+
+        return matches;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
new file mode 100644
index 0000000..f0a4d3b
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.cli.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.common.configuration.Configuration;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+public class ElasticsearchDataPrunerRunner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+
+        /**
+         * Example
+         * start=$(date -d '30 days ago' +%m/%d/%Y)
+         * yarn jar Metron-DataLoads-{VERSION}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner -i host1:9300 -p '/bro_index_' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
+         * echo ${start}
+         **/
+
+        Options options = buildOptions();
+        Options help = new Options();
+        TransportClient client = null;
+
+        Option o = new Option("h", "help", false, "This screen");
+        o.setRequired(false);
+        help.addOption(o);
+
+
+
+        try {
+
+            CommandLine cmd = checkOptions(help,options, argv);
+
+            String start = cmd.getOptionValue("s");
+            Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
+
+            Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
+            String indexPrefix = cmd.getOptionValue("p");
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Running prune with args: " + startDate + " " + numDays);
+            }
+
+            Configuration configuration = null;
+
+            if( cmd.hasOption("z")){
+
+                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+                CuratorFramework framework = CuratorFrameworkFactory.newClient(cmd.getOptionValue("z"),retryPolicy);
+                framework.start();
+                configuration = new Configuration(framework);
+
+            } else if ( cmd.hasOption("c") ){
+
+                String resourceFile = cmd.getOptionValue("c");
+                configuration = new Configuration(Paths.get(resourceFile));
+
+            }
+
+            configuration.update();
+
+            Map<String, Object> globalConfiguration = configuration.getGlobalConfig();
+            ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+            builder.put("cluster.name", globalConfiguration.get("es.clustername"));
+            builder.put("curatorFramework.transport.ping_timeout","500s");
+            client = new TransportClient(builder.build())
+                    .addTransportAddress(new InetSocketTransportAddress(globalConfiguration.get("es.ip").toString(), Integer.parseInt(globalConfiguration.get("es.port").toString())));
+
+            DataPruner pruner = new ElasticsearchDataPruner(startDate, numDays, configuration, client, indexPrefix);
+
+            LOG.info("Pruned " + pruner.prune() + " indices from " +  globalConfiguration.get("es.ip") + ":" + globalConfiguration.get("es.port") + "/" + indexPrefix);
+
+
+        } catch (Exception e) {
+
+            e.printStackTrace();
+            System.exit(-1);
+
+        } finally {
+
+            if( null != client) {
+                client.close();
+            }
+
+        }
+
+    }
+
+    public static CommandLine checkOptions(Options help, Options options, String ... argv) throws ParseException {
+
+        CommandLine cmd = null;
+        CommandLineParser parser = new PosixParser();
+
+
+        try {
+
+            cmd = parser.parse(help,argv,true);
+
+            if( cmd.getOptions().length > 0){
+                final HelpFormatter usageFormatter = new HelpFormatter();
+                usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+                System.exit(0);
+            }
+
+            cmd = parser.parse(options, argv);
+
+        } catch (ParseException e) {
+
+            final HelpFormatter usageFormatter = new HelpFormatter();
+            usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+            throw e;
+
+        }
+
+
+        if( (cmd.hasOption("z") && cmd.hasOption("c")) || (!cmd.hasOption("z") && !cmd.hasOption("c")) ){
+
+            System.err.println("One (only) of zookeeper-hosts or config-location is required");
+            final HelpFormatter usageFormatter = new HelpFormatter();
+            usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+            throw new RuntimeException("Must specify zookeeper-hosts or config-location, but not both");
+
+        }
+
+        return cmd;
+    }
+
+    public static Options buildOptions(){
+
+        Options options = new Options();
+
+        Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
+        o.setArgName("START_DATE");
+        o.setRequired(true);
+        options.addOption(o);
+
+        o = new Option("n", "numdays", true, "Number of days back to purge");
+        o.setArgName("NUMDAYS");
+        o.setRequired(true);
+        options.addOption(o);
+
+        o = new Option("p", "index-prefix", true, "Index prefix  - e.g. bro_index_");
+        o.setArgName("PREFIX");
+        o.setRequired(true);
+        options.addOption(o);
+
+        o = new Option("c", "config-location", true, "Directory Path - e.g. /path/to/config/dir");
+        o.setArgName("CONFIG");
+        o.setRequired(false);
+        options.addOption(o);
+
+        o = new Option("z", "zookeeper-hosts", true, "Zookeeper URL - e.g. zkhost1:2181,zkhost2:2181,zkhost3:2181");
+        o.setArgName("PREFIX");
+        o.setRequired(false);
+        options.addOption(o);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
new file mode 100644
index 0000000..097253c
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class HDFSDataPruner extends DataPruner {
+
+
+    private Path globPath;
+    protected FileSystem fileSystem;
+    protected static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
+
+    HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException, StartDateException {
+
+        super(startDate,numDays,globPath);
+        this.globPath = new Path(wildCard);
+        Configuration conf = new Configuration();
+        conf.set("fs.defaultFS", fsUri);
+        this.fileSystem = FileSystem.get(conf);
+
+    }
+
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+
+        /**
+         * Example
+         * start=$(date -d '30 days ago' +%m/%d/%Y)
+         * yarn jar Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.HDFSDataPruner -f hdfs://ec2-52-36-25-217.us-west-2.compute.amazonaws.com:8020 -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
+         * echo ${start}
+         **/
+
+        Options options = new Options();
+        Options help = new Options();
+
+        {
+            Option o = new Option("h", "help", false, "This screen");
+            o.setRequired(false);
+            help.addOption(o);
+        }
+        {
+            Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
+            o.setArgName("START_DATE");
+            o.setRequired(true);
+            options.addOption(o);
+        }
+        {
+            Option o = new Option("f", "filesystem", true, "Filesystem uri - e.g. hdfs://host:8020 or file:///");
+            o.setArgName("FILESYSTEM");
+            o.setRequired(true);
+            options.addOption(o);
+        }
+        {
+            Option o = new Option("n", "numdays", true, "Number of days back to purge");
+            o.setArgName("NUMDAYS");
+            o.setRequired(true);
+            options.addOption(o);
+        }
+        {
+            Option o = new Option("g", "glob-string", true, "Glob filemask for files to delete - e.g. /apps/metron/enrichment/bro_doc/file-*");
+            o.setArgName("GLOBSTRING");
+            o.setRequired(true);
+            options.addOption(o);
+        }
+
+        try {
+
+            CommandLineParser parser = new PosixParser();
+            CommandLine cmd = null;
+
+            try {
+
+                cmd = parser.parse(help,argv,true);
+                if( cmd.getOptions().length > 0){
+                    final HelpFormatter usageFormatter = new HelpFormatter();
+                    usageFormatter.printHelp("HDFSDataPruner", null, options, null, true);
+                    System.exit(0);
+                }
+
+                cmd = parser.parse(options, argv);
+
+            } catch (ParseException pe) {
+
+                final HelpFormatter usageFormatter = new HelpFormatter();
+                usageFormatter.printHelp("HDFSDataPruner", null, options, null, true);
+                System.exit(-1);
+
+            }
+
+            String start = cmd.getOptionValue("s");
+            Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
+            String fileSystemUri = cmd.getOptionValue("f");
+            Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
+            String globString = cmd.getOptionValue("g");
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Running prune with args: " + startDate + " " + numDays + " " + fileSystemUri + " " + globString);
+            }
+
+            DataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
+
+            LOG.info("Pruned " + pruner.prune() + " files from " + fileSystemUri + globString);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+    }
+
+    public Long prune() throws IOException {
+
+        Long filesPruned = new Long(0);
+
+        FileStatus[] filesToDelete = fileSystem.globStatus(globPath, new HDFSDataPruner.DateFileFilter(this));
+
+        for (FileStatus fileStatus : filesToDelete) {
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Deleting File: " + fileStatus.getPath());
+            }
+
+            fileSystem.delete(fileStatus.getPath(), false);
+
+            filesPruned++;
+        }
+
+        return filesPruned;
+    }
+
+    class DateFileFilter extends Configured implements PathFilter {
+
+        HDFSDataPruner pruner;
+        Boolean failOnError = false;
+
+        DateFileFilter(HDFSDataPruner pruner) {
+            this.pruner = pruner;
+        }
+
+        DateFileFilter(HDFSDataPruner pruner, Boolean failOnError) {
+
+            this(pruner);
+            this.failOnError = failOnError;
+
+        }
+
+        @Override
+        public boolean accept(Path path) {
+            try {
+
+                if(pruner.LOG.isDebugEnabled()) {
+                    pruner.LOG.debug("ACCEPT - working with file: " + path);
+                }
+
+                if (pruner.fileSystem.isDirectory(path)) {
+                    return false;
+
+                }
+            } catch (IOException e) {
+
+                pruner.LOG.error("IOException", e);
+
+                if (failOnError) {
+                    throw new RuntimeException(e);
+                }
+
+                return false;
+            }
+
+            try {
+
+                FileStatus file = pruner.fileSystem.getFileStatus(path);
+                long fileModificationTime = file.getModificationTime();
+                boolean accept = false;
+
+                if (fileModificationTime >= pruner.firstTimeMillis && fileModificationTime < pruner.lastTimeMillis) {
+
+                    accept = true;
+                }
+
+                return accept;
+
+            } catch (IOException e) {
+
+                pruner.LOG.error("IOException", e);
+
+                if (failOnError) {
+                    throw new RuntimeException(e);
+                }
+
+                return false;
+            }
+
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
new file mode 100644
index 0000000..7acc96c
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/LeastRecentlyUsedPruner.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.hbase.mr.PrunerMapper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class LeastRecentlyUsedPruner {
+    private static abstract class OptionHandler implements Function<String, Option> {}
+    private enum BulkLoadOptions {
+        HELP("h", new OptionHandler() {
+
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                return new Option(s, "help", false, "Generate Help screen");
+            }
+        }), TABLE("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "table", true, "HBase table to prune");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        }), COLUMN_FAMILY("f", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "column_family", true, "Column family of the HBase table to prune");
+                o.setRequired(false);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        })
+        ,AS_OF_TIME("a", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "as_of", true, "The earliest access tracker you want to use.");
+                o.setArgName("datetime");
+                o.setRequired(true);
+                return o;
+            }
+        })
+        ,AS_OF_TIME_FORMAT("t", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                String defaultFormat = new SimpleDateFormat().toLocalizedPattern();
+                Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option) (Default is: " + defaultFormat + ")");
+                o.setArgName("format");
+                o.setRequired(false);
+                return o;
+            }
+        })
+        ,ACCESS_TABLE("u", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "access_table", true, "HBase table containing the access trackers.");
+                o.setRequired(true);
+                o.setArgName("HBASE_TABLE");
+                return o;
+            }
+        }), ACCESS_COLUMN_FAMILY("z", new OptionHandler() {
+            @Nullable
+            @Override
+            public Option apply(@Nullable String s) {
+                Option o = new Option(s, "access_column_family", true, "Column family of the HBase table containing the access trackers");
+                o.setRequired(true);
+                o.setArgName("CF_NAME");
+                return o;
+            }
+        });
+        Option option;
+        String shortCode;
+        BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
+            this.shortCode = shortCode;
+            this.option = optionHandler.apply(shortCode);
+        }
+
+        public boolean has(CommandLine cli) {
+            return cli.hasOption(shortCode);
+        }
+
+        public String get(CommandLine cli) {
+            return cli.getOptionValue(shortCode);
+        }
+        private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
+            Date d = getFormat(cli).parse(BulkLoadOptions.AS_OF_TIME.get(cli));
+            return d.getTime();
+        }
+
+        private static DateFormat getFormat(CommandLine cli) {
+            DateFormat format = new SimpleDateFormat();
+            if (BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
+                 format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
+            }
+            return format;
+        }
+
+        public static CommandLine parse(CommandLineParser parser, String[] args) {
+            try {
+                CommandLine cli = parser.parse(getOptions(), args);
+                if(BulkLoadOptions.HELP.has(cli)) {
+                    printHelp();
+                    System.exit(0);
+                }
+                return cli;
+            } catch (ParseException e) {
+                System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+                e.printStackTrace(System.err);
+                printHelp();
+                System.exit(-1);
+                return null;
+            }
+        }
+
+        public static void printHelp() {
+            HelpFormatter formatter = new HelpFormatter();
+            formatter.printHelp( "LeastRecentlyUsedPruner", getOptions());
+        }
+
+        public static Options getOptions() {
+            Options ret = new Options();
+            for(BulkLoadOptions o : BulkLoadOptions.values()) {
+               ret.addOption(o.option);
+            }
+            return ret;
+        }
+    }
+
+    public static void setupHBaseJob(Job job, String sourceTable, String cf) throws IOException {
+        Scan scan = new Scan();
+        if(cf != null) {
+            scan.addFamily(Bytes.toBytes(cf));
+        }
+        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
+        scan.setCacheBlocks(false);  // don't set to true for MR jobs
+// set other scan attrs
+
+        TableMapReduceUtil.initTableMapperJob(
+                sourceTable,      // input table
+                scan,	          // Scan instance to control CF and attribute selection
+                PrunerMapper.class,   // mapper class
+                null,	          // mapper output key
+                null,	          // mapper output value
+                job);
+        TableMapReduceUtil.initTableReducerJob(
+                sourceTable,      // output table
+                null,             // reducer class
+                job);
+    }
+
+    public static Job createJob( Configuration conf
+                               , String table
+                               , String cf
+                               , String accessTrackerTable
+                               , String accessTrackerColumnFamily
+                               , Long ts
+                               ) throws IOException
+    {
+        Job job = new Job(conf);
+        job.setJobName("LeastRecentlyUsedPruner: Pruning " +  table + ":" + cf + " since " + new SimpleDateFormat().format(new Date(ts)));
+        System.out.println("Configuring " + job.getJobName());
+        job.setJarByClass(LeastRecentlyUsedPruner.class);
+        job.getConfiguration().setLong(PrunerMapper.TIMESTAMP_CONF, ts);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_NAME_CONF, table);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_CF_CONF, accessTrackerColumnFamily);
+        job.getConfiguration().set(PrunerMapper.ACCESS_TRACKER_TABLE_CONF, accessTrackerTable);
+        setupHBaseJob(job, table, cf);
+        job.setNumReduceTasks(0);
+        return job;
+    }
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+        Configuration conf = HBaseConfiguration.create();
+        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+        CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
+        Long ts = BulkLoadOptions.getTimestamp(cli);
+        String table = BulkLoadOptions.TABLE.get(cli);
+        String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
+        String accessTrackerTable = BulkLoadOptions.ACCESS_TABLE.get(cli);
+        String accessTrackerCF = BulkLoadOptions.ACCESS_COLUMN_FAMILY.get(cli);
+        Job job = createJob(conf, table, cf, accessTrackerTable, accessTrackerCF, ts);
+        System.exit(job.waitForCompletion(true) ? 0 : 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0117987e/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
new file mode 100644
index 0000000..d3a0549
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+
+public class StartDateException extends Exception {
+
+    public StartDateException(String message){
+        super(message);
+    }
+
+    public StartDateException(String message, Throwable t){
+        super(message,t);
+    }
+
+}



Mime
View raw message