metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject incubator-metron git commit: METRON-664: Make the index configuration per-writer with enabled/disabled closes apache/incubator-metron#419
Date Thu, 26 Jan 2017 00:51:30 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master cd3bb4f8d -> 5a5d42b85


METRON-664: Make the index configuration per-writer with enabled/disabled closes apache/incubator-metron#419


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/5a5d42b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/5a5d42b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/5a5d42b8

Branch: refs/heads/master
Commit: 5a5d42b8583fef320ddfd52131915195935b1b73
Parents: cd3bb4f
Author: cstella <cestella@gmail.com>
Authored: Wed Jan 25 19:51:04 2017 -0500
Committer: cstella <cestella@gmail.com>
Committed: Wed Jan 25 19:51:04 2017 -0500

----------------------------------------------------------------------
 .../configuration/IndexingConfigurations.java   | 46 +++++++++--
 .../writer/IndexingWriterConfiguration.java     | 23 ++++--
 .../writer/ParserWriterConfiguration.java       | 24 +++++-
 .../writer/SingleBatchConfigurationFacade.java  | 10 +++
 .../writer/WriterConfiguration.java             |  2 +
 .../metron/common/writer/BulkMessageWriter.java |  1 +
 .../metron/common/writer/MessageWriter.java     |  1 +
 .../writer/IndexingWriterConfigurationTest.java |  4 +-
 .../writer/ElasticsearchWriter.java             |  5 ++
 .../writer/SimpleHbaseEnrichmentWriter.java     |  5 ++
 .../bolt/BulkMessageWriterBoltTest.java         |  1 +
 metron-platform/metron-indexing/README.md       | 84 +++++++++++++++++--
 .../src/main/config/zookeeper/indexing/asa.json | 15 +++-
 .../src/main/config/zookeeper/indexing/bro.json | 18 +++-
 .../main/config/zookeeper/indexing/snort.json   | 17 +++-
 .../config/zookeeper/indexing/websphere.json    | 17 +++-
 .../src/main/config/zookeeper/indexing/yaf.json | 17 +++-
 .../main/config/zookeeper/indexing/test.json    | 17 +++-
 metron-platform/metron-management/README.md     |  9 ++
 .../management/IndexingConfigFunctions.java     | 87 +++++++++++++++++++-
 .../management/IndexingConfigFunctionsTest.java | 28 +++++--
 .../metron/parsers/bolt/ParserBoltTest.java     | 14 ++--
 .../metron/parsers/bolt/WriterBoltTest.java     |  3 +-
 .../SimpleHBaseEnrichmentWriterTest.java        | 10 +++
 .../apache/metron/pcap/writer/PcapWriter.java   |  5 ++
 .../apache/metron/solr/writer/SolrWriter.java   |  5 ++
 .../metron/solr/writer/SolrWriterTest.java      | 10 +--
 .../metron/writer/BulkWriterComponent.java      |  3 +
 .../org/apache/metron/writer/NoopWriter.java    | 10 ++-
 .../metron/writer/WriterToBulkWriter.java       |  5 ++
 .../writer/bolt/BulkMessageWriterBolt.java      | 11 ++-
 .../apache/metron/writer/hdfs/HdfsWriter.java   |  7 +-
 .../apache/metron/writer/kafka/KafkaWriter.java |  5 ++
 33 files changed, 458 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
index fb610c6..5f7998b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -29,11 +29,18 @@ import java.util.Map;
 
 public class IndexingConfigurations extends Configurations {
   public static final String BATCH_SIZE_CONF = "batchSize";
+  public static final String ENABLED_CONF = "enabled";
   public static final String INDEX_CONF = "index";
 
-  public Map<String, Object> getSensorIndexingConfig(String sensorType) {
+  public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) {
     Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorType));
-    return ret != null?ret:new HashMap<>();
+    if(ret == null) {
+      return new HashMap();
+    }
+    else {
+      Map<String, Object> writerConfig = (Map<String, Object>)ret.get(writerName);
+      return writerConfig != null?writerConfig:new HashMap<>();
+    }
   }
 
   public void updateSensorIndexingConfig(String sensorType, byte[] data) throws IOException {
@@ -54,13 +61,35 @@ public class IndexingConfigurations extends Configurations {
     return ConfigurationType.INDEXING.getName() + "." + sensorType;
   }
 
+  public boolean isDefault(String sensorName, String writerName) {
+    Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorName));
+    if(ret == null) {
+      return true;
+    }
+    else {
+      Map<String, Object> writerConfig = (Map<String, Object>)ret.get(writerName);
+      return writerConfig != null?false:true;
+    }
+  }
+
+  public int getBatchSize(String sensorName, String writerName ) {
+     return getBatchSize(getSensorIndexingConfig(sensorName, writerName));
+  }
+
+  public String getIndex(String sensorName, String writerName) {
+    return getIndex(getSensorIndexingConfig(sensorName, writerName), sensorName);
+  }
 
-  public int getBatchSize(String sensorName) {
-     return getBatchSize(getSensorIndexingConfig(sensorName));
+  public boolean isEnabled(String sensorName, String writerName) {
+    return isEnabled(getSensorIndexingConfig(sensorName, writerName));
   }
 
-  public String getIndex(String sensorName) {
-    return getIndex(getSensorIndexingConfig(sensorName), sensorName);
+  public static boolean isEnabled(Map<String, Object> conf) {
+    return getAs( ENABLED_CONF
+                 ,conf
+                , true
+                , Boolean.class
+                );
   }
 
   public static int getBatchSize(Map<String, Object> conf) {
@@ -79,6 +108,11 @@ public class IndexingConfigurations extends Configurations {
                 );
   }
 
+  public static Map<String, Object> setEnabled(Map<String, Object> conf, boolean enabled) {
+    Map<String, Object> ret = conf == null?new HashMap<>():conf;
+    ret.put(ENABLED_CONF, enabled);
+    return ret;
+  }
   public static Map<String, Object> setBatchSize(Map<String, Object> conf, int batchSize) {
     Map<String, Object> ret = conf == null?new HashMap<>():conf;
     ret.put(BATCH_SIZE_CONF, batchSize);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
index 97e7977..7fca9c2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
@@ -27,31 +27,40 @@ import java.util.Optional;
 
 public class IndexingWriterConfiguration implements WriterConfiguration{
   private Optional<IndexingConfigurations> config;
+  private String writerName;
 
-
-  public IndexingWriterConfiguration(IndexingConfigurations config) {
+  public IndexingWriterConfiguration(String writerName, IndexingConfigurations config) {
     this.config = Optional.ofNullable(config);
+    this.writerName = writerName;
   }
 
-
-
   @Override
   public int getBatchSize(String sensorName) {
-    return config.orElse(new IndexingConfigurations()).getBatchSize(sensorName);
+    return config.orElse(new IndexingConfigurations()).getBatchSize(sensorName, writerName);
   }
 
   @Override
   public String getIndex(String sensorName) {
-    return config.orElse(new IndexingConfigurations()).getIndex(sensorName);
+    return config.orElse(new IndexingConfigurations()).getIndex(sensorName, writerName);
+  }
+
+  @Override
+  public boolean isEnabled(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).isEnabled(sensorName, writerName);
   }
 
   @Override
   public Map<String, Object> getSensorConfig(String sensorName) {
-    return config.orElse(new IndexingConfigurations()).getSensorIndexingConfig(sensorName);
+    return config.orElse(new IndexingConfigurations()).getSensorIndexingConfig(sensorName, writerName);
   }
 
   @Override
   public Map<String, Object> getGlobalConfig() {
     return config.orElse(new IndexingConfigurations()).getGlobalConfig();
   }
+
+  @Override
+  public boolean isDefault(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).isDefault(sensorName, writerName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
index b998ac1..b9595db 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java
@@ -18,14 +18,13 @@
 
 package org.apache.metron.common.configuration.writer;
 
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.utils.ConversionUtils;
 
 import java.util.Map;
 
 public class ParserWriterConfiguration implements WriterConfiguration {
-  public static final String BATCH_CONF = "batchSize";
-  public static final String INDEX_CONF = "indexName";
   private ParserConfigurations config;
   public ParserWriterConfiguration(ParserConfigurations config) {
     this.config = config;
@@ -36,7 +35,7 @@ public class ParserWriterConfiguration implements WriterConfiguration {
     && config.getSensorParserConfig(sensorName) != null
     && config.getSensorParserConfig(sensorName).getParserConfig() != null
       ) {
-      Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(BATCH_CONF);
+      Object batchObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.BATCH_SIZE_CONF);
       return batchObj == null ? 1 : ConversionUtils.convert(batchObj, Integer.class);
     }
     return 1;
@@ -47,7 +46,7 @@ public class ParserWriterConfiguration implements WriterConfiguration {
     if(config != null && config.getSensorParserConfig(sensorName) != null
     && config.getSensorParserConfig(sensorName).getParserConfig() != null
       ) {
-      Object indexObj = config.getSensorParserConfig(sensorName).getParserConfig().get(INDEX_CONF);
+      Object indexObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.INDEX_CONF);
       if(indexObj != null) {
         return indexObj.toString();
       }
@@ -57,6 +56,18 @@ public class ParserWriterConfiguration implements WriterConfiguration {
   }
 
   @Override
+  public boolean isEnabled(String sensorName) {
+    if(config != null
+    && config.getSensorParserConfig(sensorName) != null
+    && config.getSensorParserConfig(sensorName).getParserConfig() != null
+      ) {
+      Object enabledObj = config.getSensorParserConfig(sensorName).getParserConfig().get(IndexingConfigurations.ENABLED_CONF);
+      return enabledObj == null ? true : ConversionUtils.convert(enabledObj, Boolean.class);
+    }
+    return true;
+  }
+
+  @Override
   public Map<String, Object> getSensorConfig(String sensorName) {
     return config.getSensorParserConfig(sensorName).getParserConfig();
   }
@@ -65,4 +76,9 @@ public class ParserWriterConfiguration implements WriterConfiguration {
   public Map<String, Object> getGlobalConfig() {
     return config.getGlobalConfig();
   }
+
+  @Override
+  public boolean isDefault(String sensorName) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
index 3ee25d0..69e5541 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java
@@ -37,6 +37,11 @@ public class SingleBatchConfigurationFacade implements WriterConfiguration {
   }
 
   @Override
+  public boolean isEnabled(String sensorName) {
+    return true;
+  }
+
+  @Override
   public Map<String, Object> getSensorConfig(String sensorName) {
     return config.getSensorConfig(sensorName);
   }
@@ -45,4 +50,9 @@ public class SingleBatchConfigurationFacade implements WriterConfiguration {
   public Map<String, Object> getGlobalConfig() {
     return config.getGlobalConfig();
   }
+
+  @Override
+  public boolean isDefault(String sensorName) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
index f155302..45271e8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java
@@ -24,6 +24,8 @@ import java.util.Map;
 public interface WriterConfiguration extends Serializable {
   int getBatchSize(String sensorName);
   String getIndex(String sensorName);
+  boolean isEnabled(String sensorName);
   Map<String, Object> getSensorConfig(String sensorName);
   Map<String, Object> getGlobalConfig();
+  boolean isDefault(String sensorName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
index 64e16b1..5f427f4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
@@ -45,4 +45,5 @@ public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, Serializabl
             , List<MESSAGE_T> messages
             ) throws Exception;
 
+  String getName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
index 1ae54b1..b0e48a9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
@@ -27,4 +27,5 @@ public interface MessageWriter<T> extends AutoCloseable, Serializable {
 
   void init();
   void write(String sensorType, WriterConfiguration configurations, Tuple tuple, T message) throws Exception;
+  String getName();
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
index 2e66818..94da965 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
@@ -27,14 +27,14 @@ import org.junit.Test;
 public class IndexingWriterConfigurationTest {
   @Test
   public void testDefaultBatchSize() {
-    IndexingWriterConfiguration config = new IndexingWriterConfiguration(
+    IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs",
            new IndexingConfigurations()
     );
     Assert.assertEquals(1, config.getBatchSize("foo"));
   }
   @Test
   public void testDefaultIndex() {
-    IndexingWriterConfiguration config = new IndexingWriterConfiguration(
+    IndexingWriterConfiguration config = new IndexingWriterConfiguration("hdfs",
            new IndexingConfigurations()
     );
     Assert.assertEquals("foo", config.getIndex("foo"));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 69708fe..c90bc8c 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -190,6 +190,11 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
     return buildWriteReponse(tuples, bulkResponse);
   }
 
+  @Override
+  public String getName() {
+    return "elasticsearch";
+  }
+
   protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, BulkResponse bulkResponse) throws Exception {
     // Elasticsearch responses are in the same order as the request, giving us an implicit mapping with Tuples
     BulkWriterResponse writerResponse = new BulkWriterResponse();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
index 13d88d5..343eecd 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
@@ -277,6 +277,11 @@ public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkM
   }
 
   @Override
+  public String getName() {
+    return "hbaseEnrichment";
+  }
+
+  @Override
   public void close() throws Exception {
     synchronized(this) {
       if(table != null) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 3a4f5b0..c5c1294 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -127,6 +127,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
       fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
     } catch(RuntimeException e) {}
     reset(bulkMessageWriter);
+    when(bulkMessageWriter.getName()).thenReturn("hdfs");
     bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
     verify(bulkMessageWriter, times(1)).init(eq(stormConf), any(WriterConfiguration.class));
     tupleList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index 63ad3a6..5611838 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -28,14 +28,88 @@ Errors during indexing are sent to a kafka queue called `index_errors`
 
 ##Sensor Indexing Configuration
 The sensor specific configuration is intended to configure the
-indexing used for a given sensor type (e.g. `snort`).
+indexing used for a given sensor type (e.g. `snort`).  
 
-Just like the global config, the format is a JSON stored in zookeeper.
-The configuration is a JSON map with the following fields:
-* `index` : The name of the index to write to (defaulted to the name of the sensor).
-* `batchSize` : The size of the batch that is written to the indices at once (defaulted to 1.
+Just like the global config, the format is a JSON stored in zookeeper and on disk at `$METRON_HOME/config/zookeeper/indexing`.  Within the sensor-specific configuration, you can configure the individual writers.  The writers currently supported are:
+* `elasticsearch`
+* `hdfs`
+* `solr`
 
+Depending on how you start the indexing topology, it will have either
+elasticsearch or solr and hdfs writers running.
 
+The configuration for an individual writer-specific configuration is a JSON map with the following fields:
+* `index` : The name of the index to write to (defaulted to the name of the sensor).
+* `batchSize` : The size of the batch that is written to the indices at once (defaulted to `1`).
+* `enabled` : Whether the writer is enabled (default `true`).
+
+### Indexing Configuration Examples
+For a given  sensor, the following scenarios would be indicated by
+the following cases:
+#### Base Case
+```
+{
+}
+```
+or no file at all.
+
+* elasticsearch writer
+  * enabled
+  * batch size of 1
+  * index name the same as the sensor
+* hdfs writer
+  * enabled
+  * batch size of 1
+  * index name the same as the sensor
+
+If a writer config is unspecified, then a warning is indicated in the
+Storm console.  e.g.:
+`WARNING: Default and (likely) unoptimized writer config used for hdfs writer and sensor squid`
+
+#### Fully specified
+```
+{
+   "elasticsearch": {
+      "index": "foo",
+      "batchSize" : 100,
+      "enabled" : true 
+    },
+   "hdfs": {
+      "index": "foo",
+      "batchSize": 1,
+      "enabled" : true
+    }
+}
+```
+* elasticsearch writer
+  * enabled
+  * batch size of 100
+  * index name of "foo"
+* hdfs writer
+  * enabled
+  * batch size of 1
+  * index name of "foo"
+
+#### HDFS Writer turned off
+```
+{
+   "elasticsearch": {
+      "index": "foo",
+      "enabled" : true 
+    },
+   "hdfs": {
+      "index": "foo",
+      "batchSize": 100,
+      "enabled" : false
+    }
+}
+```
+* elasticsearch writer
+  * enabled
+  * batch size of 1
+  * index name of "foo"
+* hdfs writer
+  * disabled
 
 # Notes on Performance Tuning
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
index afc500c..153ccff 100644
--- a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
@@ -1,5 +1,18 @@
 {
+  "hdfs" : {
     "index": "asa",
-    "batchSize": 5
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "asa",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "asa",
+    "batchSize": 5,
+    "enabled" : true
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
index b559ca9..b0aa8e4 100644
--- a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
@@ -1,5 +1,17 @@
 {
-  "index": "bro",
-  "batchSize": 5
+  "hdfs" : {
+    "index": "bro",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "bro",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "bro",
+    "batchSize": 5,
+    "enabled" : true
+  }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
index 3c37faa..f6112a8 100644
--- a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
@@ -1,4 +1,17 @@
 {
-  "index": "snort",
-  "batchSize": 1
+  "hdfs" : {
+    "index": "snort",
+    "batchSize": 1,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "snort",
+    "batchSize": 1,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "snort",
+    "batchSize": 1,
+    "enabled" : true
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
index 4b69f5b..1a2b53d 100644
--- a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
@@ -1,5 +1,18 @@
 {
-  "index": "websphere",
-  "batchSize": 5
+  "hdfs" : {
+    "index": "websphere",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "websphere",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "websphere",
+    "batchSize": 5,
+    "enabled" : true
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
index b248524..0586497 100644
--- a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
@@ -1,4 +1,17 @@
 {
-  "index": "yaf",
-  "batchSize": 5
+  "hdfs" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : true
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
index 239484a..0197f0c 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
@@ -1,5 +1,18 @@
 {
-  "index": "yaf",
-  "batchSize": 5
+  "hdfs" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "elasticsearch" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : true
+  },
+  "solr" : {
+    "index": "yaf",
+    "batchSize": 5,
+    "enabled" : true
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/README.md b/metron-platform/metron-management/README.md
index aee0de8..cf922ba 100644
--- a/metron-platform/metron-management/README.md
+++ b/metron-platform/metron-management/README.md
@@ -172,12 +172,21 @@ The functions are split roughly into a few sections:
   * Description: Set batch size
   * Input:
     * sensorConfig - Sensor config to add transformation to.
+    * writer - The writer to update (e.g. elasticsearch, solr or hdfs)
     * size - batch size (integer)
   * Returns: The String representation of the config in zookeeper
+* `INDEXING_SET_ENABLED`
+  * Description: Enable or disable an indexing writer for a sensor.
+  * Input:
+    * sensorConfig - Sensor config to add transformation to.
+    * writer - The writer to update (e.g. elasticsearch, solr or hdfs)
+    * enabled? - boolean indicating whether the writer is enabled.  If omitted, then it will set enabled.
+  * Returns: The String representation of the config in zookeeper
 * `INDEXING_SET_INDEX`
   * Description: Set the index for the sensor
   * Input:
     * sensorConfig - Sensor config to add transformation to.
+    * writer - The writer to update (e.g. elasticsearch, solr or hdfs)
     * sensor - sensor name
   * Returns: The String representation of the config in zookeeper
 * `ENRICHMENT_STELLAR_TRANSFORM_ADD`

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
index f572cbf..2755dd0 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
@@ -27,6 +27,7 @@ import org.apache.metron.common.dsl.StellarFunction;
 import org.apache.metron.common.utils.ConversionUtils;
 import org.apache.metron.common.utils.JSONUtils;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -40,6 +41,7 @@ public class IndexingConfigFunctions {
           ,name = "SET_BATCH"
           ,description = "Set batch size"
           ,params = {"sensorConfig - Sensor config to add transformation to."
+                    ,"writer - The writer to update (e.g. elasticsearch, solr or hdfs)"
                     ,"size - batch size (integer)"
                     }
           ,returns = "The String representation of the config in zookeeper"
@@ -57,11 +59,79 @@ public class IndexingConfigFunctions {
       else {
         configObj = (Map<String, Object>) INDEXING.deserialize(config);
       }
-      int batchSize = 5;
+      String writer = null;
       if(args.size() > 1) {
+        writer = ConversionUtils.convert(args.get(i++), String.class);
+        if(!configObj.containsKey(writer)) {
+          configObj.put(writer, new HashMap<String, Object>());
+        }
+      }
+      if(writer == null) {
+        throw new IllegalStateException("Invalid writer name: " + config);
+      }
+      int batchSize = 5;
+      if(args.size() > 2) {
         batchSize = ConversionUtils.convert(args.get(i++), Integer.class);
       }
-      configObj = IndexingConfigurations.setBatchSize(configObj, batchSize);
+      configObj.put(writer, IndexingConfigurations.setBatchSize((Map<String, Object>) configObj.get(writer), batchSize));
+      try {
+        return JSONUtils.INSTANCE.toJSON(configObj, true);
+      } catch (JsonProcessingException e) {
+        LOG.error("Unable to convert object to JSON: " + configObj, e);
+        return config;
+      }
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(
+           namespace = "INDEXING"
+          ,name = "SET_ENABLED"
+          ,description = "Enable or disable an indexing writer for a sensor."
+          ,params = {"sensorConfig - Sensor config to add transformation to."
+                    ,"writer - The writer to update (e.g. elasticsearch, solr or hdfs)"
+                    ,"enabled? - boolean indicating whether the writer is enabled.  If omitted, then it will set enabled."
+                    }
+          ,returns = "The String representation of the config in zookeeper"
+          )
+  public static class Enabled implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      int i = 0;
+      String config = (String) args.get(i++);
+      Map<String, Object> configObj;
+      if(config == null || config.isEmpty()) {
+        throw new IllegalStateException("Invalid config: " + config);
+      }
+      else {
+        configObj = (Map<String, Object>) INDEXING.deserialize(config);
+      }
+      String writer = null;
+      if(args.size() > 1) {
+        writer = ConversionUtils.convert(args.get(i++), String.class);
+        if(!configObj.containsKey(writer)) {
+          configObj.put(writer, new HashMap<String, Object>());
+        }
+      }
+      if(writer == null) {
+        throw new IllegalStateException("Invalid writer name: " + config);
+      }
+      boolean enabled = true;
+      if(args.size() > 2) {
+        enabled = ConversionUtils.convert(args.get(i++), Boolean.class);
+      }
+
+      configObj.put(writer, IndexingConfigurations.setEnabled((Map<String, Object>) configObj.get(writer), enabled));
       try {
         return JSONUtils.INSTANCE.toJSON(configObj, true);
       } catch (JsonProcessingException e) {
@@ -86,6 +156,7 @@ public class IndexingConfigFunctions {
           ,name = "SET_INDEX"
           ,description = "Set the index for the sensor"
           ,params = {"sensorConfig - Sensor config to add transformation to."
+                    ,"writer - The writer to update (e.g. elasticsearch, solr or hdfs)"
                     ,"sensor - sensor name"
                     }
           ,returns = "The String representation of the config in zookeeper"
@@ -103,11 +174,21 @@ public class IndexingConfigFunctions {
       else {
         configObj = (Map<String, Object>) INDEXING.deserialize(config);
       }
+      String writer = null;
+      if(args.size() > 1) {
+        writer = ConversionUtils.convert(args.get(i++), String.class);
+        if(!configObj.containsKey(writer)) {
+          configObj.put(writer, new HashMap<String, Object>());
+        }
+      }
+      if(writer == null) {
+        throw new IllegalStateException("Invalid writer name: " + config);
+      }
       String sensorName = ConversionUtils.convert(args.get(i++), String.class);
       if(sensorName == null) {
         throw new IllegalStateException("Invalid sensor name: " + config);
       }
-      configObj = IndexingConfigurations.setIndex(configObj, sensorName);
+      configObj.put(writer, IndexingConfigurations.setIndex((Map<String, Object>) configObj.get(writer), sensorName));
       try {
         return JSONUtils.INSTANCE.toJSON(configObj, true);
       } catch (JsonProcessingException e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
index 18132ed..5c73966 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
@@ -58,32 +58,48 @@ public class IndexingConfigFunctionsTest {
 
   @Test
   public void testSetBatch() {
-    String out = (String) run("INDEXING_SET_BATCH(config, 10)"
+    String out = (String) run("INDEXING_SET_BATCH(config, 'hdfs', 10)"
                              , toMap("config", "{}")
     );
     Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out);
-    Assert.assertEquals(IndexingConfigurations.getBatchSize(config), 10);
+    Assert.assertEquals(IndexingConfigurations.getBatchSize((Map<String, Object>) config.get("hdfs")), 10);
   }
 
   @Test(expected=ParseException.class)
   public void testSetBatchBad() {
-    run("INDEXING_SET_BATCH(config, 10)"
+    run("INDEXING_SET_BATCH(config, 'hdfs', 10)"
+                             , new HashMap<>()
+    );
+  }
+
+  @Test
+  public void testSetEnabled() {
+    String out = (String) run("INDEXING_SET_ENABLED(config, 'hdfs', true)"
+                             , toMap("config", "{}")
+    );
+    Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out);
+    Assert.assertTrue(IndexingConfigurations.isEnabled((Map<String, Object>) config.get("hdfs")));
+  }
+
+  @Test(expected=ParseException.class)
+  public void testSetEnabledBad() {
+    run("INDEXING_SET_ENABLED(config, 'hdfs', 10)"
                              , new HashMap<>()
     );
   }
 
   @Test
   public void testSetIndex() {
-    String out = (String) run("INDEXING_SET_INDEX(config, 'foo')"
+    String out = (String) run("INDEXING_SET_INDEX(config, 'hdfs', 'foo')"
             , toMap("config", "{}")
     );
     Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out);
-    Assert.assertEquals("foo", IndexingConfigurations.getIndex(config, null));
+    Assert.assertEquals("foo", IndexingConfigurations.getIndex((Map<String, Object>)config.get("hdfs"), null));
   }
 
   @Test(expected= ParseException.class)
   public void testSetIndexBad() {
-    run("INDEXING_SET_INDEX(config, NULL)"
+    run("INDEXING_SET_INDEX(config, 'hdfs', NULL)"
             , new HashMap<>()
     );
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 1efcf2e..cf90178 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,12 +17,11 @@
  */
 package org.apache.metron.parsers.bolt;
 
-import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.*;
 
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
-import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.dsl.Context;
@@ -100,6 +99,11 @@ public class ParserBoltTest extends BaseBoltTest {
     }
 
     @Override
+    public String getName() {
+      return "recording";
+    }
+
+    @Override
     public void close() throws Exception {
 
     }
@@ -400,7 +404,7 @@ public void testImplicitBatchOfOne() throws Exception {
               @Override
               public Map<String, Object> getParserConfig() {
                 return new HashMap<String, Object>() {{
-                  put(ParserWriterConfiguration.BATCH_CONF, "1");
+                  put(IndexingConfigurations.BATCH_SIZE_CONF, "1");
                 }};
               }
             };
@@ -438,7 +442,7 @@ public void testImplicitBatchOfOne() throws Exception {
               @Override
               public Map<String, Object> getParserConfig() {
                 return new HashMap<String, Object>() {{
-                  put(ParserWriterConfiguration.BATCH_CONF, 5);
+                  put(IndexingConfigurations.BATCH_SIZE_CONF, 5);
                 }};
               }
             };
@@ -486,7 +490,7 @@ public void testImplicitBatchOfOne() throws Exception {
               @Override
               public Map<String, Object> getParserConfig() {
                 return new HashMap<String, Object>() {{
-                  put(ParserWriterConfiguration.BATCH_CONF, 5);
+                  put(IndexingConfigurations.BATCH_SIZE_CONF, 5);
                 }};
               }
             };

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index 8dbbf36..4693829 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.parsers.bolt;
 import org.apache.log4j.Level;
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.storm.task.OutputCollector;
@@ -66,7 +67,7 @@ public class WriterBoltTest extends BaseBoltTest{
               @Override
               public Map<String, Object> getParserConfig() {
                 return new HashMap<String, Object>() {{
-                  put(ParserWriterConfiguration.BATCH_CONF, batchSize);
+                  put(IndexingConfigurations.BATCH_SIZE_CONF, batchSize);
                 }};
               }
             };

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index a21d45b..b9b3246 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -163,6 +163,11 @@ public class SimpleHBaseEnrichmentWriterTest {
       }
 
       @Override
+      public boolean isEnabled(String sensorName) {
+        return true;
+      }
+
+      @Override
       public Map<String, Object> getSensorConfig(String sensorName) {
         return sensorConfig;
 
@@ -172,6 +177,11 @@ public class SimpleHBaseEnrichmentWriterTest {
       public Map<String, Object> getGlobalConfig() {
         return null;
       }
+
+      @Override
+      public boolean isDefault(String sensorName) {
+        return false;
+      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
index a67cd1b..153a1aa 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
@@ -51,4 +51,9 @@ public class PcapWriter extends HBaseWriter {
     values.put(column, tuple.getBinary(0));
     return values;
   }
+
+  @Override
+  public String getName() {
+    return "pcap";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 50b11eb..7180ca9 100644
--- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -86,6 +86,11 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
     return response;
   }
 
+  @Override
+  public String getName() {
+    return "solr";
+  }
+
   protected String getCollection(WriterConfiguration configurations) {
     String collection = (String) configurations.getGlobalConfig().get("solr.collection");
     return collection != null ? collection : DEFAULT_COLLECTION;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 454a340..14e5dcb 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -107,7 +107,7 @@ public class SolrWriterTest {
     String collection = "metron";
     MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
     SolrWriter writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, new IndexingWriterConfiguration(configurations));
+    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
     verify(solr, times(1)).createCollection(collection, 1, 1);
     verify(solr, times(1)).setDefaultCollection(collection);
 
@@ -120,18 +120,18 @@ public class SolrWriterTest {
     globalConfig.put("solr.replicationFactor", replicationFactor);
     configurations.updateGlobalConfig(globalConfig);
     writer = new SolrWriter().withMetronSolrClient(solr);
-    writer.init(null, new IndexingWriterConfiguration(configurations));
+    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
     verify(solr, times(1)).createCollection(collection, numShards, replicationFactor);
     verify(solr, times(1)).setDefaultCollection(collection);
 
-    writer.write("test", new IndexingWriterConfiguration(configurations), new ArrayList<>(), messages);
+    writer.write("test", new IndexingWriterConfiguration("solr", configurations), new ArrayList<>(), messages);
     verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
     verify(solr, times(1)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
     verify(solr, times(0)).commit(collection);
 
     writer = new SolrWriter().withMetronSolrClient(solr).withShouldCommit(true);
-    writer.init(null, new IndexingWriterConfiguration(configurations));
-    writer.write("test", new IndexingWriterConfiguration(configurations), new ArrayList<>(), messages);
+    writer.init(null, new IndexingWriterConfiguration("solr", configurations));
+    writer.write("test", new IndexingWriterConfiguration("solr", configurations), new ArrayList<>(), messages);
     verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message1.toJSONString().hashCode(), "test", 100, 100.0)));
     verify(solr, times(2)).add(argThat(new SolrInputDocumentMatcher(message2.toJSONString().hashCode(), "test", 200, 200.0)));
     verify(solr, times(1)).commit(collection);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index a601987..124ffd3 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -100,6 +100,9 @@ public class BulkWriterComponent<MESSAGE_T> {
                    , WriterConfiguration configurations
                    ) throws Exception
   {
+    if(!configurations.isEnabled(sensorType)) {
+      return;
+    }
     int batchSize = configurations.getBatchSize(sensorType);
     Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
     if (tupleList == null) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index e323056..a31f48e 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -24,13 +24,14 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.utils.ConversionUtils;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.json.simple.JSONObject;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 
-public class NoopWriter extends AbstractWriter implements BulkMessageWriter<Object> {
+public class NoopWriter extends AbstractWriter implements BulkMessageWriter<JSONObject> {
 
   public static class RandomLatency implements Function<Void, Void> {
     private int min;
@@ -129,7 +130,7 @@ public class NoopWriter extends AbstractWriter implements BulkMessageWriter<Obje
   }
 
   @Override
-  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<Object> messages) throws Exception {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
     if(sleepFunction != null) {
       sleepFunction.apply(null);
     }
@@ -140,6 +141,11 @@ public class NoopWriter extends AbstractWriter implements BulkMessageWriter<Obje
   }
 
   @Override
+  public String getName() {
+    return "noop";
+  }
+
+  @Override
   public void close() throws Exception {
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index e3e8150..47d9b02 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -64,6 +64,11 @@ public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_
   }
 
   @Override
+  public String getName() {
+    return messageWriter.getName();
+  }
+
+  @Override
   public void close() throws Exception {
     messageWriter.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 14b9b16..66c4c73 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -48,6 +48,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
   private BulkWriterComponent<JSONObject> writerComponent;
   private String messageGetterStr = MessageGetters.NAMED.name();
   private transient MessageGetter messageGetter = null;
+  private transient OutputCollector collector;
   private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation;
   public BulkMessageWriterBolt(String zookeeperUrl) {
     super(zookeeperUrl);
@@ -71,6 +72,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     this.writerComponent = new BulkWriterComponent<>(collector);
+    this.collector = collector;
     super.prepare(stormConf, context, collector);
     messageGetter = MessageGetters.valueOf(messageGetterStr);
     if(bulkMessageWriter instanceof WriterToBulkWriter) {
@@ -81,7 +83,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     }
     try {
       bulkMessageWriter.init(stormConf
-                            , configurationTransformation.apply(new IndexingWriterConfiguration(getConfigurations()))
+                            , configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
                             );
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -95,11 +97,16 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     String sensorType = MessageUtils.getSensorType(message);
     try
     {
+      WriterConfiguration writerConfiguration = configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
+      if(writerConfiguration.isDefault(sensorType)) {
+        //want to warn, but not fail the tuple
+        collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType));
+      }
       writerComponent.write(sensorType
                            , tuple
                            , message
                            , bulkMessageWriter
-                           , configurationTransformation.apply(new IndexingWriterConfiguration(getConfigurations()))
+                           , writerConfiguration
                            );
       LOG.trace("Writing enrichment message: {}", message);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 3af0a93..4f6b4bb 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -74,7 +74,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
                    ) throws Exception
   {
     BulkWriterResponse response = new BulkWriterResponse();
-    SourceHandler handler = getSourceHandler(sourceType);
+    SourceHandler handler = getSourceHandler(configurations.getIndex(sourceType));
     try {
       handler.handle(messages);
     } catch(Exception e) {
@@ -86,6 +86,11 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
+  public String getName() {
+    return "hdfs";
+  }
+
+  @Override
   public void close() {
     for(SourceHandler handler : sourceHandlerMap.values()) {
       handler.close();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5a5d42b8/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 9cc8680..5c00e52 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -175,6 +175,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
   }
 
   @Override
+  public String getName() {
+    return "kafka";
+  }
+
+  @Override
   public void close() throws Exception {
     kafkaProducer.close();
   }


Mime
View raw message