metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [5/5] incubator-metron git commit: METRON-154: Decouple enrichment and indexing closes apache/incubator-metron#192
Date Wed, 20 Jul 2016 13:14:28 GMT
METRON-154: Decouple enrichment and indexing closes apache/incubator-metron#192


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/5ffcef8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/5ffcef8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/5ffcef8d

Branch: refs/heads/master
Commit: 5ffcef8d4a9ec0788dca094f54b07d99c44c6bef
Parents: 1c568bf
Author: cstella <cestella@gmail.com>
Authored: Wed Jul 20 09:14:16 2016 -0400
Committer: cstella <cestella@gmail.com>
Committed: Wed Jul 20 09:14:16 2016 -0400

----------------------------------------------------------------------
 metron-deployment/amazon-ec2/conf/defaults.yml  |   1 +
 .../inventory/full-dev-platform/group_vars/all  |   3 +-
 .../inventory/metron_example/group_vars/all     |   1 +
 .../roles/metron_kafka_topics/defaults/main.yml |  12 +-
 .../roles/metron_streaming/defaults/main.yml    |   4 +
 .../metron_streaming/tasks/copy_bundles.yml     |  10 +
 .../roles/metron_streaming/tasks/topologies.yml |  28 +-
 .../roles/monit/tasks/monit-definitions.yml     |   8 +-
 metron-deployment/roles/monit/tasks/scripts.yml |   2 +
 .../monit/enrichment-elasticsearch.monit        |  26 -
 .../monit/templates/monit/enrichment-solr.monit |  26 -
 .../monit/templates/monit/enrichment.monit      |  26 +
 .../monit/indexing-elasticsearch.monit          |  26 +
 .../monit/templates/monit/indexing-solr.monit   |  26 +
 metron-platform/metron-common/pom.xml           |   4 +
 .../org/apache/metron/common/Constants.java     |   2 +
 .../apache/metron/common/utils/BloomFilter.java |  63 +++
 .../apache/metron/common/utils/KafkaUtils.java  |  54 ++
 .../metron/common/writer/AbstractWriter.java    |  26 -
 .../common/writer/BulkWriterComponent.java      | 130 -----
 .../apache/metron/common/writer/NoopWriter.java | 143 ------
 .../common/writer/WriterToBulkWriter.java       |  54 --
 .../metron/common/writer/NoopWriterTest.java    |  48 --
 metron-platform/metron-elasticsearch/pom.xml    |  29 ++
 .../src/main/config/elasticsearch.properties    |  55 +--
 .../scripts/start_elasticsearch_topology.sh     |   2 +-
 .../ElasticsearchEnrichmentIntegrationTest.java |  98 ----
 .../ElasticsearchIndexingIntegrationTest.java   | 100 ++++
 metron-platform/metron-enrichment/pom.xml       |  72 ++-
 .../src/main/assembly/assembly.xml              |   8 +
 .../src/main/config/enrichment.properties       |  82 ++++
 .../src/main/flux/enrichment/remote.yaml        |  69 +--
 .../src/main/flux/enrichment/test.yaml          |  64 +--
 .../enrichment/bolt/BulkMessageWriterBolt.java  |  83 ----
 .../accesstracker/BloomAccessTracker.java       |  32 +-
 .../writer/SimpleHbaseEnrichmentWriter.java     | 275 +++++++++++
 .../hbase/SimpleHbaseEnrichmentWriter.java      | 282 -----------
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  96 ----
 .../writer/hdfs/SourceAwareMoveAction.java      |  48 --
 .../writer/hdfs/SourceFileNameFormat.java       |  48 --
 .../metron/writer/hdfs/SourceHandler.java       | 160 ------
 .../main/scripts/start_enrichment_topology.sh   |  22 +
 .../bolt/BulkMessageWriterBoltTest.java         |   3 +-
 .../integration/EnrichmentIntegrationTest.java  | 426 ++++++++++++++++
 .../components/ConfigUploadComponent.java       | 114 +++++
 .../integration/mock/MockGeoAdapter.java        |  64 +++
 .../integration/mock/MockHBaseConnector.java    |  52 ++
 .../integration/mock/MockTableProvider.java     |  45 ++
 .../integration/utils/SampleUtil.java           |  57 +++
 metron-platform/metron-indexing/pom.xml         | 253 ++++++++++
 .../src/main/assembly/assembly.xml              |  57 +++
 .../src/main/flux/indexing/remote.yaml          | 152 ++++++
 .../integration/IndexingIntegrationTest.java    | 240 +++++++++
 metron-platform/metron-integration-test/pom.xml |   5 -
 .../metron/integration/BaseIntegrationTest.java |   1 -
 .../integration/EnrichmentIntegrationTest.java  | 489 -------------------
 .../components/ConfigUploadComponent.java       | 114 -----
 .../metron/integration/mock/MockGeoAdapter.java |  64 ---
 .../integration/mock/MockHBaseConnector.java    |  52 --
 .../integration/mock/MockTableProvider.java     |  45 --
 .../metron/integration/utils/SampleUtil.java    |  57 ---
 metron-platform/metron-parsers/pom.xml          |  16 +-
 .../src/main/flux/asa/remote.yaml               |   2 +-
 .../metron-parsers/src/main/flux/asa/test.yaml  |   2 +-
 .../src/main/flux/fireeye/remote.yaml           |   2 +-
 .../src/main/flux/fireeye/test.yaml             |   2 +-
 .../src/main/flux/ise/remote.yaml               |   2 +-
 .../metron-parsers/src/main/flux/ise/test.yaml  |   2 +-
 .../src/main/flux/lancope/remote.yaml           |   2 +-
 .../src/main/flux/lancope/test.yaml             |   2 +-
 .../src/main/flux/paloalto/remote.yaml          |   2 +-
 .../src/main/flux/paloalto/test.yaml            |   2 +-
 .../src/main/flux/sourcefire/remote.yaml        |   2 +-
 .../src/main/flux/sourcefire/test.yaml          |   2 +-
 .../src/main/flux/websphere/remote.yaml         |   2 +-
 .../src/main/flux/websphere/test.yaml           |   2 +-
 .../apache/metron/parsers/bolt/ParserBolt.java  |  10 -
 .../metron/parsers/bolt/WriterHandler.java      |   4 +-
 .../parsers/topology/ParserTopologyBuilder.java |   4 +-
 .../metron/parsers/writer/KafkaWriter.java      | 164 -------
 .../integration/ParserIntegrationTest.java      |   6 +-
 .../metron/parsers/writer/KafkaWriterTest.java  |  95 ----
 .../SimpleHBaseEnrichmentWriterTest.java        |   5 +-
 ...pleHbaseEnrichmentWriterIntegrationTest.java |  14 +-
 .../integration/WriterBoltIntegrationTest.java  |  17 +-
 metron-platform/metron-solr/pom.xml             |  50 +-
 .../metron-solr/src/main/config/solr.properties |  61 +--
 .../src/main/scripts/start_solr_topology.sh     |   2 +-
 .../SolrEnrichmentIntegrationTest.java          | 114 -----
 .../SolrIndexingIntegrationTest.java            | 114 +++++
 .../metron/solr/writer/SolrWriterTest.java      |   4 +-
 metron-platform/metron-writer/pom.xml           | 231 +++++++++
 .../apache/metron/writer/AbstractWriter.java    |  26 +
 .../metron/writer/BulkWriterComponent.java      | 127 +++++
 .../org/apache/metron/writer/NoopWriter.java    | 141 ++++++
 .../metron/writer/WriterToBulkWriter.java       |  58 +++
 .../writer/bolt/BulkMessageWriterBolt.java      | 114 +++++
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  96 ++++
 .../writer/hdfs/SourceAwareMoveAction.java      |  48 ++
 .../writer/hdfs/SourceFileNameFormat.java       |  48 ++
 .../metron/writer/hdfs/SourceHandler.java       | 160 ++++++
 .../apache/metron/writer/kafka/KafkaWriter.java | 181 +++++++
 .../metron/writer/message/MessageGetter.java    |  26 +
 .../metron/writer/message/MessageGetters.java   |  37 ++
 .../writer/message/NamedMessageGetter.java      |  34 ++
 .../metron/writer/message/RawMessageGetter.java |  50 ++
 .../apache/metron/writer/NoopWriterTest.java    |  48 ++
 .../metron/writer/kafka/KafkaWriterTest.java    |  88 ++++
 metron-platform/pom.xml                         |   2 +
 109 files changed, 4044 insertions(+), 2817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/amazon-ec2/conf/defaults.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/amazon-ec2/conf/defaults.yml b/metron-deployment/amazon-ec2/conf/defaults.yml
index 2874b2c..9e05bbc 100644
--- a/metron-deployment/amazon-ec2/conf/defaults.yml
+++ b/metron-deployment/amazon-ec2/conf/defaults.yml
@@ -30,6 +30,7 @@ services_to_start:
   - bro-parser
   - snort-parser
   - enrichment
+  - indexing
 
 # the ami for cent6 by region
 amis_by_region:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/inventory/full-dev-platform/group_vars/all
----------------------------------------------------------------------
diff --git a/metron-deployment/inventory/full-dev-platform/group_vars/all b/metron-deployment/inventory/full-dev-platform/group_vars/all
index 2e87a29..6afe4ae 100644
--- a/metron-deployment/inventory/full-dev-platform/group_vars/all
+++ b/metron-deployment/inventory/full-dev-platform/group_vars/all
@@ -21,15 +21,14 @@ services_to_start:
   - elasticsearch
   - pcap-service
   - kibana
-  - yaf
   - snort
   - snort-logs
   - bro
   - pcap-replay
-  - yaf-parser
   - bro-parser
   - snort-parser
   - enrichment
+  - indexing
 
 # ambari
 ambari_host: "{{ groups.ambari_master[0] }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/inventory/metron_example/group_vars/all
----------------------------------------------------------------------
diff --git a/metron-deployment/inventory/metron_example/group_vars/all b/metron-deployment/inventory/metron_example/group_vars/all
index bfa2469..a0d9257 100644
--- a/metron-deployment/inventory/metron_example/group_vars/all
+++ b/metron-deployment/inventory/metron_example/group_vars/all
@@ -30,6 +30,7 @@ services_to_start:
   - bro-parser
   - snort-parser
   - enrichment
+  - indexing
 
 #Ansible Variables
 ansible_ssh_private_key_file: /Path/to/private/key/file #Change This

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/metron_kafka_topics/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_kafka_topics/defaults/main.yml b/metron-deployment/roles/metron_kafka_topics/defaults/main.yml
index aac9a06..fa0ee02 100644
--- a/metron-deployment/roles/metron_kafka_topics/defaults/main.yml
+++ b/metron-deployment/roles/metron_kafka_topics/defaults/main.yml
@@ -17,10 +17,12 @@
 ---
 kafka_home: /usr/hdp/current/kafka-broker/
 topics_to_create:
-  - { topic: "pcap",            num_partitions: 1, replication_factor: 1, retention_gb: 10 }
-  - { topic: "bro",             num_partitions: 1, replication_factor: 1, retention_gb: 10 }
-  - { topic: "yaf",             num_partitions: 1, replication_factor: 1, retention_gb: 10 }
-  - { topic: "snort",           num_partitions: 1, replication_factor: 1, retention_gb: 10 }
-  - { topic: "enrichments",     num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "pcap",        num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "bro",         num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "yaf",         num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "snort",       num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "enrichments", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
   - { topic: "parser_invalid",  num_partitions: 1, replication_factor: 1, retention_gb: 10 }
   - { topic: "parser_error",    num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "indexing", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+  - { topic: "indexing_error", num_partitions: 1, replication_factor: 1, retention_gb: 10 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/metron_streaming/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/defaults/main.yml b/metron-deployment/roles/metron_streaming/defaults/main.yml
index cbec601..8618c38 100644
--- a/metron-deployment/roles/metron_streaming/defaults/main.yml
+++ b/metron-deployment/roles/metron_streaming/defaults/main.yml
@@ -20,11 +20,13 @@ metron_solr_jar_name: metron-solr-{{ metron_version }}.jar
 metron_elasticsearch_jar_name: metron-elasticsearch-{{ metron_version }}.jar
 metron_pcap_jar_name: metron-pcap-backend-{{ metron_version }}.jar
 metron_parsers_jar_name: metron-parsers-{{ metron_version }}.jar
+metron_enrichment_jar_name: metron-enrichment-{{ metron_version }}-uber.jar
 
 metron_common_bundle_name: metron-common-{{ metron_version }}-archive.tar.gz
 metron_pcap_bundle_name: metron-pcap-backend-{{ metron_version }}-archive.tar.gz
 metron_data_management_bundle_name: metron-data-management-{{ metron_version }}-archive.tar.gz
 metron_enrichment_bundle_name: metron-enrichment-{{ metron_version }}-archive.tar.gz
+metron_indexing_bundle_name: metron-indexing-{{ metron_version }}-archive.tar.gz
 metron_solr_bundle_name: metron-solr-{{ metron_version }}-archive.tar.gz
 metron_elasticsearch_bundle_name: metron-elasticsearch-{{ metron_version }}-archive.tar.gz
 metron_parsers_bundle_name: metron-parsers-{{ metron_version }}-archive.tar.gz
@@ -32,6 +34,7 @@ metron_pcap_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-pcap-b
 metron_common_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-common/target/{{ metron_common_bundle_name }}"
 metron_data_management_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-data-management/target/{{ metron_data_management_bundle_name }}"
 metron_enrichment_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-enrichment/target/{{ metron_enrichment_bundle_name }}"
+metron_indexing_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-indexing/target/{{ metron_indexing_bundle_name }}"
 metron_solr_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-solr/target/{{ metron_solr_bundle_name }}"
 metron_elasticsearch_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-elasticsearch/target/{{ metron_elasticsearch_bundle_name }}"
 metron_parsers_bundle_path: "{{ playbook_dir }}/../../metron-platform/metron-parsers/target/{{ metron_parsers_bundle_name }}"
@@ -42,6 +45,7 @@ zookeeper_global_config_path: "{{ zookeeper_config_path }}/global.json"
 metron_pcap_properties_config_path: "{{ metron_directory }}/config/pcap.properties"
 metron_solr_properties_config_path: "{{ metron_directory }}/config/solr.properties"
 metron_elasticsearch_properties_config_path: "{{ metron_directory }}/config/elasticsearch.properties"
+metron_enrichment_properties_config_path: "{{ metron_directory }}/config/enrichment.properties"
 metron_parsers_properties_config_path: "{{ metron_directory }}/config/parsers.properties"
 hbase_config_path: "/etc/hbase/conf"
 hdfs_config_path: "/etc/hadoop/conf"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/metron_streaming/tasks/copy_bundles.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/copy_bundles.yml b/metron-deployment/roles/metron_streaming/tasks/copy_bundles.yml
index 97fb259..519b7d3 100644
--- a/metron-deployment/roles/metron_streaming/tasks/copy_bundles.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/copy_bundles.yml
@@ -30,6 +30,12 @@
     src: "{{ metron_enrichment_bundle_path }}"
     dest: "{{ metron_directory }}"
 
+- name: Copy Metron Indexing bundle
+  copy:
+    src: "{{ metron_indexing_bundle_path }}"
+    dest: "{{ metron_directory }}"
+
+
 - name: Copy Metron Parsers bundle
   copy:
     src: "{{ metron_parsers_bundle_path }}"
@@ -58,6 +64,7 @@
     - tar xzvf metron-solr*.tar.gz
     - tar xzvf metron-elasticsearch*.tar.gz
     - tar xzvf metron-enrichment*.tar.gz
+    - tar xzvf metron-indexing*.tar.gz
     - tar xzvf metron-parsers*.tar.gz
     - tar xzvf metron-data-management*.tar.gz
     - tar xzvf metron-common*.tar.gz
@@ -67,6 +74,9 @@
 - name: Add *-site.xml files to topology jars
   shell: "cd {{ item.config_path }} && jar -uf {{ metron_directory }}/lib/{{ item.jar_name }} {{ item.file_name }}"
   with_items:
+      - { config_path: "{{ hbase_config_path }}", jar_name: "{{ metron_enrichment_jar_name }}", file_name: "hbase-site.xml" }
+      - { config_path: "{{ hdfs_config_path }}", jar_name: "{{ metron_enrichment_jar_name }}", file_name: "core-site.xml" }
+      - { config_path: "{{ hdfs_config_path }}", jar_name: "{{ metron_enrichment_jar_name }}", file_name: "hdfs-site.xml" }
       - { config_path: "{{ hbase_config_path }}", jar_name: "{{ metron_solr_jar_name }}", file_name: "hbase-site.xml" }
       - { config_path: "{{ hdfs_config_path }}", jar_name: "{{ metron_solr_jar_name }}", file_name: "core-site.xml" }
       - { config_path: "{{ hdfs_config_path }}", jar_name: "{{ metron_solr_jar_name }}", file_name: "hdfs-site.xml" }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/metron_streaming/tasks/topologies.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron_streaming/tasks/topologies.yml b/metron-deployment/roles/metron_streaming/tasks/topologies.yml
index 2010eae..0ca0b3d 100644
--- a/metron-deployment/roles/metron_streaming/tasks/topologies.yml
+++ b/metron-deployment/roles/metron_streaming/tasks/topologies.yml
@@ -29,18 +29,14 @@
     - { regexp: "kafka.pcap.out=", line: "kafka.pcap.out={{ pcap_hdfs_path }}" }
     - { regexp: "spout.kafka.topic.pcap=", line: "spout.kafka.topic.pcap={{ pycapa_topic }}" }
 
-- name: Configure Metron Solr topology
+- name: Configure Metron Enrichment topology
   lineinfile: >
-    dest={{ metron_solr_properties_config_path }}
+    dest={{ metron_enrichment_properties_config_path }}
     regexp="{{ item.regexp }}"
     line="{{ item.line }}"
   with_items:
     - { regexp: "kafka.zk=", line: "kafka.zk={{ zookeeper_url }}" }
     - { regexp: "kafka.broker=", line: "kafka.broker={{ kafka_broker_url }}" }
-    - { regexp: "es.ip=", line: "es.ip={{ groups.search[0] }}" }
-    - { regexp: "es.port=", line: "es.port={{ elasticsearch_transport_port }}" }
-    - { regexp: "es.clustername=", line: "es.clustername={{ elasticsearch_cluster_name }}" }
-    - { regexp: "bolt.hdfs.file.system.url=", line: "bolt.hdfs.file.system.url={{ hdfs_url }}" }
     - { regexp: "spout.kafka.topic.bro=", line: "spout.kafka.topic.bro={{ bro_topic }}" }
     - { regexp: "threat.intel.tracker.table=", line: "threat.intel.tracker.table={{ tracker_hbase_table }}" }
     - { regexp: "threat.intel.tracker.cf=", line: "threat.intel.tracker.cf=t" }
@@ -50,6 +46,17 @@
     - { regexp: "enrichment.simple.hbase.cf=", line: "enrichment.simple.hbase.cf=t" }
     - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.mysql[0] }}" }
     - { regexp: "mysql.password=", line: "mysql.password={{ mysql_root_password }}" }
+
+
+- name: Configure Metron Solr topology
+  lineinfile: >
+    dest={{ metron_solr_properties_config_path }}
+    regexp="{{ item.regexp }}"
+    line="{{ item.line }}"
+  with_items:
+    - { regexp: "kafka.zk=", line: "kafka.zk={{ zookeeper_url }}" }
+    - { regexp: "kafka.broker=", line: "kafka.broker={{ kafka_broker_url }}" }
+    - { regexp: "bolt.hdfs.file.system.url=", line: "bolt.hdfs.file.system.url={{ hdfs_url }}" }
     - { regexp: "index.hdfs.output=", line: "index.hdfs.output={{ metron_hdfs_output_dir }}/enrichment/indexed" }
     - { regexp: "bolt.hdfs.rotation.policy=", line: "bolt.hdfs.rotation.policy={{ metron_hdfs_rotation_policy }}" }
     - { regexp: "bolt.hdfs.rotation.policy.count=", line: "bolt.hdfs.rotation.policy.count={{ metron_hdfs_rotation_policy_count}}" }
@@ -67,15 +74,6 @@
     - { regexp: "es.port=", line: "es.port={{ elasticsearch_transport_port }}" }
     - { regexp: "es.clustername=", line: "es.clustername={{ elasticsearch_cluster_name }}" }
     - { regexp: "bolt.hdfs.file.system.url=", line: "bolt.hdfs.file.system.url={{ hdfs_url }}" }
-    - { regexp: "spout.kafka.topic.bro=", line: "spout.kafka.topic.bro={{ bro_topic }}" }
-    - { regexp: "threat.intel.tracker.table=", line: "threat.intel.tracker.table={{ tracker_hbase_table }}" }
-    - { regexp: "threat.intel.tracker.cf=", line: "threat.intel.tracker.cf=t" }
-    - { regexp: "threat.intel.simple.hbase.table=", line: "threat.intel.simple.hbase.table={{ threatintel_hbase_table }}" }
-    - { regexp: "threat.intel.simple.hbase.cf=", line: "threat.intel.simple.hbase.cf=t" }
-    - { regexp: "enrichment.simple.hbase.table=", line: "enrichment.simple.hbase.table={{ enrichment_hbase_table }}" }
-    - { regexp: "enrichment.simple.hbase.cf=", line: "enrichment.simple.hbase.cf=t" }
-    - { regexp: "mysql.ip=", line: "mysql.ip={{ groups.mysql[0] }}" }
-    - { regexp: "mysql.password=", line: "mysql.password={{ mysql_root_password }}" }
     - { regexp: "index.hdfs.output=", line: "index.hdfs.output={{ metron_hdfs_output_dir }}/enrichment/indexed" }
     - { regexp: "bolt.hdfs.rotation.policy=", line: "bolt.hdfs.rotation.policy={{ metron_hdfs_rotation_policy }}" }
     - { regexp: "bolt.hdfs.rotation.policy.count=", line: "bolt.hdfs.rotation.policy.count={{ metron_hdfs_rotation_policy_count}}" }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/tasks/monit-definitions.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/tasks/monit-definitions.yml b/metron-deployment/roles/monit/tasks/monit-definitions.yml
index 3778e14..f9e3897 100644
--- a/metron-deployment/roles/monit/tasks/monit-definitions.yml
+++ b/metron-deployment/roles/monit/tasks/monit-definitions.yml
@@ -24,10 +24,14 @@
   template: src=monit/elasticsearch.monit dest={{ monit_config_home }}/elasticsearch.monit
   when: ("search" in group_names) and (install_elasticsearch | default(True))
 
-- name: Create monit definition for enrichment with elasticsearch
-  template: src=monit/enrichment-elasticsearch.monit dest={{ monit_config_home }}/enrichment-elasticsearch.monit
+- name: Create monit definition for indexing with elasticsearch
+  template: src=monit/indexing-elasticsearch.monit dest={{ monit_config_home }}/indexing-elasticsearch.monit
   when: ("enrichment" in group_names) and (install_elasticsearch | default(True))
 
+- name: Create monit definition for enrichment
+  template: src=monit/enrichment.monit dest={{ monit_config_home }}/enrichment.monit
+  when: ("enrichment" in group_names)
+
 - name: Create monit definition for kibana
   template: src=monit/kibana.monit dest={{ monit_config_home }}/kibana.monit
   when: ("web" in group_names) and (install_elasticsearch | default(True))

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/tasks/scripts.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/tasks/scripts.yml b/metron-deployment/roles/monit/tasks/scripts.yml
index df1a443..43901d2 100644
--- a/metron-deployment/roles/monit/tasks/scripts.yml
+++ b/metron-deployment/roles/monit/tasks/scripts.yml
@@ -39,6 +39,7 @@
     - bro
     - pcap
     - enrichment
+    - indexing
 
 - name: Deploy topology status scripts
   template:
@@ -51,3 +52,4 @@
     - bro
     - pcap
     - enrichment
+    - indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/templates/monit/enrichment-elasticsearch.monit
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/templates/monit/enrichment-elasticsearch.monit b/metron-deployment/roles/monit/templates/monit/enrichment-elasticsearch.monit
deleted file mode 100644
index ceae475..0000000
--- a/metron-deployment/roles/monit/templates/monit/enrichment-elasticsearch.monit
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-check program enrichment with path "{{ monit_home }}/status_enrichment_topology.sh"
-  start program "{{ metron_directory }}/bin/start_elasticsearch_topology.sh"
-  stop program "{{ monit_home }}/stop_enrichment_topology.sh"
-  if status != 0 then restart
-  group yaf
-  group bro
-  group snort
-  group enrichments
-  group metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/templates/monit/enrichment-solr.monit
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/templates/monit/enrichment-solr.monit b/metron-deployment/roles/monit/templates/monit/enrichment-solr.monit
deleted file mode 100644
index 94f4858..0000000
--- a/metron-deployment/roles/monit/templates/monit/enrichment-solr.monit
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-check program enrichment with path "{{ monit_home }}/status_enrichment_topology.sh"
-  start program "{{ metron_directory }}/bin/start_solr_topology.sh"
-  stop program "{{ monit_home }}/stop_enrichment_topology.sh"
-  if status != 0 then restart
-  group yaf
-  group bro
-  group snort
-  group enrichments
-  group metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/templates/monit/enrichment.monit
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/templates/monit/enrichment.monit b/metron-deployment/roles/monit/templates/monit/enrichment.monit
new file mode 100644
index 0000000..43d197e
--- /dev/null
+++ b/metron-deployment/roles/monit/templates/monit/enrichment.monit
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+check program enrichment with path "{{ monit_home }}/status_enrichment_topology.sh"
+  start program "{{ metron_directory }}/bin/start_enrichment_topology.sh"
+  stop program "{{ monit_home }}/stop_enrichment_topology.sh"
+  if status != 0 then restart
+  group yaf
+  group bro
+  group snort
+  group enrichments
+  group metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/templates/monit/indexing-elasticsearch.monit
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/templates/monit/indexing-elasticsearch.monit b/metron-deployment/roles/monit/templates/monit/indexing-elasticsearch.monit
new file mode 100644
index 0000000..86992e4
--- /dev/null
+++ b/metron-deployment/roles/monit/templates/monit/indexing-elasticsearch.monit
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+check program indexing with path "{{ monit_home }}/status_indexing_topology.sh"
+  start program "{{ metron_directory }}/bin/start_elasticsearch_topology.sh"
+  stop program "{{ monit_home }}/stop_indexing_topology.sh"
+  if status != 0 then restart
+  group yaf
+  group bro
+  group snort
+  group enrichments
+  group metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-deployment/roles/monit/templates/monit/indexing-solr.monit
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/monit/templates/monit/indexing-solr.monit b/metron-deployment/roles/monit/templates/monit/indexing-solr.monit
new file mode 100644
index 0000000..432a0e6
--- /dev/null
+++ b/metron-deployment/roles/monit/templates/monit/indexing-solr.monit
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+check program indexing with path "{{ monit_home }}/status_indexing_topology.sh"
+  start program "{{ metron_directory }}/bin/start_solr_topology.sh"
+  stop program "{{ monit_home }}/stop_indexing_topology.sh"
+  if status != 0 then restart
+  group yaf
+  group bro
+  group snort
+  group enrichments
+  group metron

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 30162dd..d625402 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -294,6 +294,10 @@
                                     <pattern>com.google.common</pattern>
                                     <shadedPattern>org.apache.metron.guava</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons.beanutils</pattern>
+                                    <shadedPattern>org.apache.metron.beanutils</shadedPattern>
+                                </relocation>
                             </relocations>
                             <transformers>
                                 <transformer

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 1175e8b..d1f5748 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -27,6 +27,8 @@ public class Constants {
   public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
   public static final String SENSOR_TYPE = "source.type";
   public static final String ENRICHMENT_TOPIC = "enrichments";
+  public static final String INDEXING_TOPIC = "indexing";
+  public static final String INDEXING_ERROR_TOPIC = "indexing_error";
   public static final String DEFAULT_PARSER_ERROR_TOPIC = "parser_error";
   public static final String DEFAULT_PARSER_INVALID_TOPIC = "parser_invalid";
   public static final String ERROR_STREAM = "error";

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
new file mode 100644
index 0000000..82172f5
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import com.google.common.hash.Funnel;
+import com.google.common.hash.PrimitiveSink;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+public class BloomFilter<T> implements Serializable {
+
+  private static class BloomFunnel<T> implements Funnel<T>, Serializable {
+    Function<T, byte[]> serializer;
+    public BloomFunnel(Function<T, byte[]> serializer) {
+      this.serializer = serializer;
+    }
+    @Override
+    public void funnel(T obj, PrimitiveSink primitiveSink) {
+      primitiveSink.putBytes(serializer.apply(obj));
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this.getClass().equals(obj.getClass());
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode() * 31;
+    }
+  }
+  private com.google.common.hash.BloomFilter<T> filter;
+  public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, double falsePositiveRate) {
+    filter = com.google.common.hash.BloomFilter.create(new BloomFunnel<T>(serializer), expectedInsertions, falsePositiveRate);
+  }
+
+  public boolean mightContain(T key) {
+    return filter.mightContain(key);
+  }
+  public void add(T key) {
+    filter.put(key);
+  }
+  public void merge(BloomFilter<T> filter2) {
+    filter.putAll(filter2.filter);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
new file mode 100644
index 0000000..6d2af72
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public enum KafkaUtils {
+  INSTANCE;
+  public List<String> getBrokersFromZookeeper(String zkQuorum) throws Exception {
+    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+    CuratorFramework framework = CuratorFrameworkFactory.newClient(zkQuorum, retryPolicy);
+    framework.start();
+    try {
+      return getBrokersFromZookeeper(framework);
+    } finally {
+      framework.close();
+    }
+  }
+  public List<String> getBrokersFromZookeeper(CuratorFramework client) throws Exception {
+    List<String> ret = new ArrayList<>();
+    for(String id : client.getChildren().forPath("/brokers/ids")) {
+      byte[] data = client.getData().forPath("/brokers/ids/" + id);
+      String brokerInfoStr = new String(data);
+      Map<String, Object> brokerInfo = JSONUtils.INSTANCE.load(brokerInfoStr, new TypeReference<Map<String, Object>>() {
+      });
+      ret.add(brokerInfo.get("host") + ":" + brokerInfo.get("port"));
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
deleted file mode 100644
index 56a4e48..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/AbstractWriter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.writer;
-
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-
-public abstract class AbstractWriter {
-  public AbstractWriter() {}
-  public abstract void configure(String sensorName, WriterConfiguration configuration);
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
deleted file mode 100644
index c0e4f37..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.writer;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import com.google.common.collect.Iterables;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.metron.common.utils.ErrorUtils;
-import org.apache.metron.common.utils.MessageUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.function.Function;
-
-public class BulkWriterComponent<MESSAGE_T> {
-  public static final Logger LOG = LoggerFactory
-            .getLogger(BulkWriterComponent.class);
-  private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
-  private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
-  private OutputCollector collector;
-  private boolean handleCommit = true;
-  private boolean handleError = true;
-  public BulkWriterComponent(OutputCollector collector) {
-    this.collector = collector;
-  }
-
-  public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean handleError) {
-    this(collector);
-    this.handleCommit = handleCommit;
-    this.handleError = handleError;
-  }
-
-  public void commit(Iterable<Tuple> tuples) {
-    tuples.forEach(t -> collector.ack(t));
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Acking " + Iterables.size(tuples) + " tuples");
-    }
-  }
-
-  public void error(Throwable e, Iterable<Tuple> tuples) {
-    tuples.forEach(t -> collector.ack(t));
-    if(!Iterables.isEmpty(tuples)) {
-      LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
-      ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
-    }
-  }
-
-  protected Collection<Tuple> createTupleCollection() {
-    return new ArrayList<>();
-  }
-
-
-  public void errorAll(Throwable e) {
-    for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) {
-      error(e, kv.getValue());
-      sensorTupleMap.remove(kv.getKey());
-      sensorMessageMap.remove(kv.getKey());
-    }
-  }
-
-  public void errorAll(String sensorType, Throwable e) {
-    error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()));
-    sensorTupleMap.remove(sensorType);
-    sensorMessageMap.remove(sensorType);
-  }
-  public void write( String sensorType
-                   , Tuple tuple
-                   , MESSAGE_T message
-                   , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
-                   , WriterConfiguration configurations
-                   ) throws Exception
-  {
-    int batchSize = configurations.getBatchSize(sensorType);
-    Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
-    if (tupleList == null) {
-      tupleList = createTupleCollection();
-    }
-    tupleList.add(tuple);
-    List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
-    if (messageList == null) {
-      messageList = new ArrayList<>();
-    }
-    messageList.add(message);
-
-    if (tupleList.size() < batchSize) {
-      sensorTupleMap.put(sensorType, tupleList);
-      sensorMessageMap.put(sensorType, messageList);
-    } else {
-      try {
-        bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
-        if(handleCommit) {
-          commit(tupleList);
-        }
-
-      } catch (Throwable e) {
-        if(handleError) {
-          error(e, tupleList);
-        }
-        else {
-          throw e;
-        }
-      }
-      finally {
-        sensorTupleMap.remove(sensorType);
-        sensorMessageMap.remove(sensorType);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/NoopWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/NoopWriter.java
deleted file mode 100644
index 3b27f75..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/NoopWriter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.writer;
-
-import backtype.storm.tuple.Tuple;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.metron.common.utils.ConversionUtils;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Function;
-
-public class NoopWriter extends AbstractWriter implements BulkMessageWriter<Object> {
-
-  public static class RandomLatency implements Function<Void, Void> {
-    private int min;
-    private int max;
-
-    public RandomLatency(int min, int max) {
-      this.min = min;
-      this.max = max;
-    }
-
-    public int getMin() {
-      return min;
-    }
-    public int getMax() {
-      return max;
-    }
-
-    @Override
-    public Void apply(Void aVoid) {
-      int sleepMs = ThreadLocalRandom.current().nextInt(min, max + 1);
-      try {
-        Thread.sleep(sleepMs);
-      } catch (InterruptedException e) {
-      }
-      return null;
-    }
-  }
-
-  public static class FixedLatency implements Function<Void, Void> {
-    private int latency;
-    public FixedLatency(int latency) {
-      this.latency = latency;
-    }
-    public int getLatency() {
-      return latency;
-    }
-
-    @Override
-    public Void apply(Void aVoid) {
-      if(latency > 0) {
-        try {
-          Thread.sleep(latency);
-        } catch (InterruptedException e) {
-        }
-      }
-      return null;
-    }
-  }
-  Function<Void, Void> sleepFunction = null;
-
-  public NoopWriter withLatency(String sleepConfig) {
-    sleepFunction = getSleepFunction(sleepConfig);
-    return this;
-  }
-
-
-  private Function<Void, Void> getSleepFunction(String sleepConfig) {
-    String usageMessage = "Unexpected: " + sleepConfig + " Expected value: integer for a fixed sleep duration in milliseconds (e.g. 10) " +
-            "or a range of latencies separated by a comma (e.g. \"10, 20\") to sleep a random amount in that range.";
-    try {
-      if (sleepConfig.contains(",")) {
-        // random latency within a range.
-        Iterable<String> it = Splitter.on(',').split(sleepConfig);
-        Integer min = ConversionUtils.convert(Iterables.getFirst(it, "").trim(), Integer.class);
-        Integer max= ConversionUtils.convert(Iterables.getLast(it, "").trim(), Integer.class);
-        if (min != null && max != null) {
-          return new RandomLatency(min, max);
-        }
-      } else {
-        //fixed latency
-        Integer latency = ConversionUtils.convert(sleepConfig.trim(), Integer.class);
-        if(latency != null) {
-          return new FixedLatency(latency);
-        }
-      }
-    }
-    catch(Throwable t) {
-      throw new IllegalArgumentException(usageMessage, t);
-    }
-    throw new IllegalArgumentException(usageMessage);
-  }
-
-  @Override
-  public void configure(String sensorName, WriterConfiguration configuration) {
-    Map<String, Object> config = configuration.getSensorConfig(sensorName);
-    if(config != null) {
-      Object noopLatency = config.get("noopLatency");
-      if(noopLatency != null) {
-        sleepFunction = getSleepFunction(noopLatency.toString());
-      }
-    }
-  }
-
-  @Override
-  public void init(Map stormConf, WriterConfiguration config) throws Exception {
-  }
-
-  @Override
-  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<Object> messages) throws Exception {
-    if(sleepFunction != null) {
-      sleepFunction.apply(null);
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
deleted file mode 100644
index b0bde6c..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/WriterToBulkWriter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.writer;
-
-import backtype.storm.tuple.Tuple;
-import com.google.common.collect.Iterables;
-import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.BulkMessageWriter;
-import org.apache.metron.common.interfaces.MessageWriter;
-
-import java.util.List;
-import java.util.Map;
-
-public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_T> {
-  MessageWriter<MESSAGE_T> messageWriter;
-
-  public WriterToBulkWriter(MessageWriter<MESSAGE_T> messageWriter) {
-    this.messageWriter = messageWriter;
-  }
-  @Override
-  public void init(Map stormConf, WriterConfiguration config) throws Exception {
-    messageWriter.init();
-  }
-
-  @Override
-  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
-    if(messages.size() > 1) {
-      throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
-    }
-    messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
-  }
-
-  @Override
-  public void close() throws Exception {
-    messageWriter.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/NoopWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/NoopWriterTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/NoopWriterTest.java
deleted file mode 100644
index e40ce7c..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/NoopWriterTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.common.writer;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class NoopWriterTest {
-  @Test
-  public void testFixedLatencyConfig() {
-    NoopWriter writer = new NoopWriter().withLatency("10");
-    Assert.assertTrue(writer.sleepFunction instanceof NoopWriter.FixedLatency);
-    NoopWriter.FixedLatency sleepFunction = (NoopWriter.FixedLatency)writer.sleepFunction;
-    Assert.assertEquals(10, sleepFunction.getLatency());
-  }
-
-  private void ensureRandomLatencyConfig(String latencyConfig, int min, int max) {
-    NoopWriter writer = new NoopWriter().withLatency(latencyConfig);
-    Assert.assertTrue(writer.sleepFunction instanceof NoopWriter.RandomLatency);
-    NoopWriter.RandomLatency sleepFunction = (NoopWriter.RandomLatency)writer.sleepFunction;
-    Assert.assertEquals(min, sleepFunction.getMin());
-    Assert.assertEquals(max, sleepFunction.getMax());
-  }
-
-  @Test
-  public void testRandomLatencyConfig() {
-    ensureRandomLatencyConfig("10,20", 10, 20);
-    ensureRandomLatencyConfig("10, 20", 10, 20);
-    ensureRandomLatencyConfig("10 ,20", 10, 20);
-    ensureRandomLatencyConfig("10 , 20", 10, 20);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml
index 4db3c07..3ecd84d 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -40,6 +40,16 @@
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-enrichment</artifactId>
             <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -132,6 +142,25 @@
             <version>${global_mockito_version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-indexing</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-indexing</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-enrichment</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <reporting>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index e2370ab..a1516d6 100644
--- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -14,22 +14,19 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+##### Storm #####
+indexing.workers=1
+indexing.executors=0
 
 ##### Kafka #####
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
-spout.kafka.topic.asa=asa
-spout.kafka.topic.bro=bro
-spout.kafka.topic.fireeye=fireeye
-spout.kafka.topic.ise=ise
-spout.kafka.topic.lancope=lancope
-spout.kafka.topic.paloalto=paloalto
-spout.kafka.topic.pcap=pcap
-spout.kafka.topic.snort=snort
-spout.kafka.topic.yaf=yaf
+kafka.start=WHERE_I_LEFT_OFF
 
 ##### Indexing #####
+index.input.topic=indexing
+index.error.topic=indexing_error
 writer.class.name=org.apache.metron.elasticsearch.writer.ElasticsearchWriter
 
 ##### ElasticSearch #####
@@ -38,13 +35,6 @@ es.ip=10.22.0.214
 es.port=9300
 es.clustername=elasticsearch
 
-##### MySQL #####
-
-mysql.ip=10.22.0.214
-mysql.port=3306
-mysql.username=root
-mysql.password=hadoop123
-
 ##### Metrics #####
 
 #reporters
@@ -63,23 +53,6 @@ org.apache.metron.metrics.TelemetryParserBolt.emits=true
 org.apache.metron.metrics.TelemetryParserBolt.fails=true
 
 
-#GenericEnrichmentBolt
-org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
-org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
-org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
-
-
-#TelemetryIndexingBolt
-org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
-org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
-org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
-
-##### Host Enrichment #####
-
-org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
-{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
-{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
-
 ##### HDFS #####
 
 bolt.hdfs.batch.size=5000
@@ -91,19 +64,3 @@ bolt.hdfs.finished.file.path=/paloalto/rotated
 bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
 index.hdfs.output=/tmp/metron/enriched
 
-##### HBase #####
-bolt.hbase.table.name=pcap
-bolt.hbase.table.fields=t:value
-bolt.hbase.table.key.tuple.field.name=key
-bolt.hbase.table.timestamp.tuple.field.name=timestamp
-bolt.hbase.enable.batching=false
-bolt.hbase.write.buffer.size.in.bytes=2000000
-bolt.hbase.durability=SKIP_WAL
-bolt.hbase.partitioner.region.info.refresh.interval.mins=60
-
-##### Threat Intel #####
-
-threat.intel.tracker.table=
-threat.intel.tracker.cf=
-threat.intel.ip.table=
-threat.intel.ip.cf=

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
index 9149d09..16f83d3 100755
--- a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
+++ b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
@@ -19,4 +19,4 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchEnrichmentIntegrationTest.java
deleted file mode 100644
index e0bbb89..0000000
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchEnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.elasticsearch.integration;
-
-import org.apache.metron.TestConstants;
-import org.apache.metron.common.interfaces.FieldNameConverter;
-import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter;
-import org.apache.metron.integration.EnrichmentIntegrationTest;
-import org.apache.metron.integration.ComponentRunner;
-import org.apache.metron.integration.InMemoryComponent;
-import org.apache.metron.integration.Processor;
-import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
-
-  private String indexDir = "target/elasticsearch";
-  private String dateFormat = "yyyy.MM.dd.HH";
-  private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
-  private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter();
-
-  @Override
-  public FieldNameConverter getFieldNameConverter() {
-    return fieldNameConverter;
-  }
-
-  @Override
-  public InMemoryComponent getSearchComponent(final Properties topologyProperties) {
-    return new ElasticSearchComponent.Builder()
-            .withHttpPort(9211)
-            .withIndexDir(new File(indexDir))
-            .build();
-  }
-
-  @Override
-  public Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
-    return new Processor<List<Map<String, Object>>>() {
-      List<Map<String, Object>> docs = null;
-      public ReadinessState process(ComponentRunner runner) {
-        ElasticSearchComponent elasticSearchComponent = runner.getComponent("search", ElasticSearchComponent.class);
-        if (elasticSearchComponent.hasIndex(index)) {
-          List<Map<String, Object>> docsFromDisk;
-          try {
-            docs = elasticSearchComponent.getAllIndexedDocs(index, testSensorType + "_doc");
-            docsFromDisk = readDocsFromDisk(hdfsDir);
-            System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve indexed documents.", e);
-          }
-          if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
-            return ReadinessState.NOT_READY;
-          } else {
-            return ReadinessState.READY;
-          }
-        } else {
-          return ReadinessState.NOT_READY;
-        }
-      }
-
-      public List<Map<String, Object>> getResult() {
-        return docs;
-      }
-    };
-  }
-
-  @Override
-  public void setAdditionalProperties(Properties topologyProperties) {
-    topologyProperties.setProperty("writer.class.name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter");
-  }
-
-  @Override
-  public String cleanField(String field) {
-    return field;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
new file mode 100644
index 0000000..b27c545
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration;
+
+import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter;
+import org.apache.metron.indexing.integration.IndexingIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTest {
+
+  private String indexDir = "target/elasticsearch";
+  private String dateFormat = "yyyy.MM.dd.HH";
+  private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
+  private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter();
+
+  @Override
+  public FieldNameConverter getFieldNameConverter() {
+    return fieldNameConverter;
+  }
+
+  @Override
+  public InMemoryComponent getSearchComponent(final Properties topologyProperties) {
+    return new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .build();
+  }
+
+  @Override
+  public Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+    return new Processor<List<Map<String, Object>>>() {
+      List<Map<String, Object>> docs = null;
+      public ReadinessState process(ComponentRunner runner) {
+        ElasticSearchComponent elasticSearchComponent = runner.getComponent("search", ElasticSearchComponent.class);
+        if (elasticSearchComponent.hasIndex(index)) {
+          List<Map<String, Object>> docsFromDisk;
+          try {
+            docs = elasticSearchComponent.getAllIndexedDocs(index, testSensorType + "_doc");
+            docsFromDisk = readDocsFromDisk(hdfsDir);
+            System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+          }
+          if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+            return ReadinessState.NOT_READY;
+          } else {
+            return ReadinessState.READY;
+          }
+        } else {
+          return ReadinessState.NOT_READY;
+        }
+      }
+
+      public List<Map<String, Object>> getResult() {
+        return docs;
+      }
+    };
+  }
+
+  @Override
+  public void setAdditionalProperties(Properties topologyProperties) {
+    topologyProperties.setProperty("es.clustername", "metron");
+    topologyProperties.setProperty("es.port", "9300");
+    topologyProperties.setProperty("es.ip", "localhost");
+    topologyProperties.setProperty("writer.class.name", "org.apache.metron.elasticsearch.writer.ElasticsearchWriter");
+  }
+
+  @Override
+  public String cleanField(String field) {
+    return field;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml
index 54c8fd5..84891e8 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -37,10 +37,25 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-writer</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-hbase</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>${slf4j.version}</version>
@@ -65,7 +80,6 @@
                     <artifactId>log4j</artifactId>
                 </exclusion>
             </exclusions>
-            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -77,7 +91,6 @@
                     <groupId>javax.servlet</groupId>
                 </exclusion>
             </exclusions>
-            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
@@ -95,36 +108,36 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${global_hbase_guava_version}</version>
+        </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-hdfs</artifactId>
-            <version>${global_storm_version}</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.9.2</artifactId>
+            <version>${global_kafka_version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.apache.storm</groupId>
-                    <artifactId>storm-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${global_guava_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${global_hadoop_version}</version>
-            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <artifactId>servlet-api</artifactId>
                     <groupId>javax.servlet</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -144,9 +157,16 @@
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-integration-test</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <reporting>
         <plugins>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
@@ -170,6 +190,7 @@
                     <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
                 </configuration>
             </plugin>
+
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>emma-maven-plugin</artifactId>
@@ -188,6 +209,17 @@
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.1</version>
                 <configuration>
@@ -214,10 +246,12 @@
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <shadedClassifierName>uber</shadedClassifierName>
                             <relocations>
                                 <relocation>
-                                    <pattern>com.google.common</pattern>
-                                    <shadedPattern>org.apache.metron.guava</shadedPattern>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>org.apache.metron.jackson</shadedPattern>
                                 </relocation>
                             </relocations>
                             <transformers>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/assembly/assembly.xml b/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
index 6c9165c..4358370 100644
--- a/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
+++ b/metron-platform/metron-enrichment/src/main/assembly/assembly.xml
@@ -53,5 +53,13 @@
       <fileMode>0644</fileMode>
       <lineEnding>unix</lineEnding>
     </fileSet>
+    <fileSet>
+      <directory>${project.basedir}/target</directory>
+      <includes>
+        <include>${project.artifactId}-${project.version}-uber.jar</include>
+      </includes>
+      <outputDirectory>/lib</outputDirectory>
+      <useDefaultExcludes>true</useDefaultExcludes>
+    </fileSet>
   </fileSets>
 </assembly>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/config/enrichment.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
new file mode 100644
index 0000000..e32915c
--- /dev/null
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
@@ -0,0 +1,82 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+##### Kafka #####
+
+kafka.zk=node1:2181
+kafka.broker=node1:6667
+enrichment.output.topic=indexing
+
+##### MySQL #####
+
+mysql.ip=10.22.0.214
+mysql.port=3306
+mysql.username=root
+mysql.password=hadoop123
+
+##### Metrics #####
+
+#reporters
+org.apache.metron.metrics.reporter.graphite=true
+org.apache.metron.metrics.reporter.console=false
+org.apache.metron.metrics.reporter.jmx=false
+
+#Graphite Addresses
+
+org.apache.metron.metrics.graphite.address=localhost
+org.apache.metron.metrics.graphite.port=2023
+
+#TelemetryParserBolt
+org.apache.metron.metrics.TelemetryParserBolt.acks=true
+org.apache.metron.metrics.TelemetryParserBolt.emits=true
+org.apache.metron.metrics.TelemetryParserBolt.fails=true
+
+
+#GenericEnrichmentBolt
+org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
+org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
+org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
+
+
+#TelemetryIndexingBolt
+org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
+org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
+org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
+
+##### Host Enrichment #####
+
+org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
+{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
+{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
+
+
+##### HBase #####
+bolt.hbase.table.name=pcap
+bolt.hbase.table.fields=t:value
+bolt.hbase.table.key.tuple.field.name=key
+bolt.hbase.table.timestamp.tuple.field.name=timestamp
+bolt.hbase.enable.batching=false
+bolt.hbase.write.buffer.size.in.bytes=2000000
+bolt.hbase.durability=SKIP_WAL
+bolt.hbase.partitioner.region.info.refresh.interval.mins=60
+
+##### Threat Intel #####
+
+threat.intel.tracker.table=
+threat.intel.tracker.cf=
+threat.intel.ip.table=
+threat.intel.ip.cf=

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
index b499b24..69d5f66 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
@@ -129,37 +129,16 @@ components:
                 args:
                     - ref: "simpleHBaseThreatIntelEnrichment"
 
-    -   id: "fileNameFormat"
-        className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
-        configMethods:
-            -   name: "withPrefix"
-                args:
-                    - "enrichment-"
-            -   name: "withExtension"
-                args:
-                  - ".json"
-            -   name: "withPath"
-                args:
-                    - "${index.hdfs.output}"
-
-    -   id: "hdfsRotationPolicy"
-        className: "${bolt.hdfs.rotation.policy}"
-        constructorArgs:
-          -  ${bolt.hdfs.rotation.policy.count}
-          - "${bolt.hdfs.rotation.policy.units}"
 #indexing
-    -   id: "hdfsWriter"
-        className: "org.apache.metron.writer.hdfs.HdfsWriter"
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
         configMethods:
-            -   name: "withFileNameFormat"
+            -   name: "withTopic"
                 args:
-                    - ref: "fileNameFormat"
-            -   name: "withRotationPolicy"
+                    - "${enrichment.output.topic}"
+            -   name: "withZkQuorum"
                 args:
-                    - ref: "hdfsRotationPolicy"
-
-    -   id: "indexWriter"
-        className: "${writer.class.name}"
+                    - "${kafka.zk}"
 
 #kafka/zookeeper
     -   id: "zkHosts"
@@ -277,22 +256,15 @@ bolts:
             -   name: "withMaxTimeRetain"
                 args: [10]
 # Indexing Bolts
-    -   id: "indexingBolt"
-        className: "org.apache.metron.enrichment.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withBulkMessageWriter"
-                args:
-                    - ref: "indexWriter"
-    -   id: "hdfsIndexingBolt"
-        className: "org.apache.metron.enrichment.bolt.BulkMessageWriterBolt"
+
+    -   id: "outputBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
         constructorArgs:
             - "${kafka.zk}"
         configMethods:
-            -   name: "withBulkMessageWriter"
+            -   name: "withMessageWriter"
                 args:
-                    - ref: "hdfsWriter"
+                    - ref: "kafkaWriter"
 
 
 streams:
@@ -389,25 +361,12 @@ streams:
             streamId: "message"
             type: FIELDS
             args: ["key"]
-#indexing
-    -   name: "threatIntelJoin -> indexing"
+#output
+    -   name: "threatIntelJoin -> output"
         from: "threatIntelJoinBolt"
-        to: "indexingBolt"
+        to: "outputBolt"
         grouping:
             streamId: "message"
             type: FIELDS
             args: ["key"]
 
-    -   name: "threatIntelJoin -> hdfs"
-        from: "threatIntelJoinBolt"
-        to: "hdfsIndexingBolt"
-        grouping:
-            streamId: "message"
-            type: SHUFFLE
-
-    -   name: "indexingBolt -> errorIndexingBolt"
-        from: "indexingBolt"
-        to: "indexingBolt"
-        grouping:
-            streamId: "error"
-            type: SHUFFLE


Mime
View raw message