metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [2/2] incubator-metron git commit: METRON-652: Extract indexing config from enrichment config closes apache/incubator-metron#415
Date Mon, 16 Jan 2017 21:50:55 GMT
METRON-652: Extract indexing config from enrichment config closes apache/incubator-metron#415


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/368e7ad6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/368e7ad6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/368e7ad6

Branch: refs/heads/master
Commit: 368e7ad63641e3c505ee6e3d335ecabfa60b8003
Parents: 56e7f41
Author: cstella <cestella@gmail.com>
Authored: Mon Jan 16 16:50:52 2017 -0500
Committer: cstella <cestella@gmail.com>
Committed: Mon Jan 16 16:50:52 2017 -0500

----------------------------------------------------------------------
 .../docker/rpm-docker/SPECS/metron.spec         |   5 +
 .../common/bolt/ConfiguredIndexingBolt.java     |  62 +++++++++
 .../common/configuration/ConfigurationType.java |   8 +-
 .../configuration/ConfigurationsUtils.java      |  58 +++++++--
 .../configuration/IndexingConfigurations.java   |  97 ++++++++++++++
 .../enrichment/SensorEnrichmentConfig.java      |  24 +---
 .../SensorEnrichmentUpdateConfig.java           |   2 -
 .../writer/EnrichmentWriterConfiguration.java   |  62 ---------
 .../writer/IndexingWriterConfiguration.java     |  57 ++++++++
 .../bolt/ConfiguredEnrichmentBoltTest.java      |   2 -
 .../SensorEnrichmentUpdateConfigTest.java       |   2 -
 .../EnrichmentWriterConfigurationTest.java      |  41 ------
 .../writer/IndexingWriterConfigurationTest.java |  42 ++++++
 metron-platform/metron-enrichment/README.md     |   6 -
 .../main/config/zookeeper/enrichments/asa.json  |   2 -
 .../main/config/zookeeper/enrichments/bro.json  |   2 -
 .../config/zookeeper/enrichments/snort.json     |   2 -
 .../config/zookeeper/enrichments/websphere.json |   2 -
 .../main/config/zookeeper/enrichments/yaf.json  |   2 -
 .../simplehbase/SimpleHBaseAdapterTest.java     |   4 -
 .../threatintel/ThreatIntelAdapterTest.java     |   2 -
 .../bolt/BulkMessageWriterBoltTest.java         |   2 +-
 .../components/ConfigUploadComponent.java       |   8 +-
 .../integration/utils/SampleUtil.java           |  14 +-
 metron-platform/metron-indexing/README.md       |  13 +-
 .../src/main/config/zookeeper/indexing/asa.json |   5 +
 .../src/main/config/zookeeper/indexing/bro.json |   5 +
 .../main/config/zookeeper/indexing/snort.json   |   4 +
 .../config/zookeeper/indexing/websphere.json    |   5 +
 .../src/main/config/zookeeper/indexing/yaf.json |   4 +
 .../integration/IndexingIntegrationTest.java    |   5 +-
 .../main/config/zookeeper/enrichments/test.json |   2 -
 .../main/config/zookeeper/indexing/test.json    |   5 +
 metron-platform/metron-management/README.md     |  14 +-
 .../management/ConfigurationFunctions.java      |  42 +++++-
 .../management/EnrichmentConfigFunctions.java   |  94 +-------------
 .../management/IndexingConfigFunctions.java     | 129 +++++++++++++++++++
 .../management/ConfigurationFunctionsTest.java  |  33 ++++-
 .../EnrichmentConfigFunctionsTest.java          |  36 ------
 .../management/IndexingConfigFunctionsTest.java |  90 +++++++++++++
 .../metron/solr/writer/SolrWriterTest.java      |  15 ++-
 .../test/bolt/BaseEnrichmentBoltTest.java       |   1 +
 .../writer/bolt/BulkMessageWriterBolt.java      |  10 +-
 43 files changed, 693 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index ff04154..0d56d7a 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -257,6 +257,11 @@ This package installs the Metron Indexing files
 %dir %{metron_home}/flux
 %dir %{metron_home}/flux/indexing
 %{metron_home}/flux/indexing/remote.yaml
+%{metron_home}/config/zookeeper/indexing/bro.json
+%{metron_home}/config/zookeeper/indexing/snort.json
+%{metron_home}/config/zookeeper/indexing/websphere.json
+%{metron_home}/config/zookeeper/indexing/yaf.json
+%{metron_home}/config/zookeeper/indexing/asa.json
 
 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
new file mode 100644
index 0000000..966739a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.bolt;
+
+import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+
+import java.io.IOException;
+
+public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConfigurations>{
+
+  private static final Logger LOG = Logger.getLogger(ConfiguredIndexingBolt.class);
+
+  public ConfiguredIndexingBolt(String zookeeperUrl) {
+    super(zookeeperUrl);
+  }
+
+  @Override
+  protected IndexingConfigurations defaultConfigurations() {
+    return new IndexingConfigurations();
+  }
+
+  @Override
+  public void loadConfig() {
+    try {
+      ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client);
+    } catch (Exception e) {
+      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
+    }
+  }
+
+  @Override
+  public void updateConfig(String path, byte[] data) throws IOException {
+    if (data.length != 0) {
+      String name = path.substring(path.lastIndexOf("/") + 1);
+      if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
+        getConfigurations().updateSensorIndexingConfig(name, data);
+        reloadCallback(name, ConfigurationType.INDEXING);
+      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
+        getConfigurations().updateGlobalConfig(data);
+        reloadCallback(name, ConfigurationType.GLOBAL);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
index 27478d4..e57ca2e 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationType.java
@@ -54,7 +54,13 @@ public enum ConfigurationType implements Function<String, Object> {
       throw new RuntimeException("Unable to load " + s, e);
     }
   }),
-
+  INDEXING("indexing","indexing", s -> {
+    try {
+      return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() { });
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to load " + s, e);
+    }
+  }),
   PROFILER("profiler","profiler", s -> {
     try {
       return JSONUtils.INSTANCE.load(s, ProfilerConfig.class);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index ef68977..2a51273 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -36,10 +36,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
-import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
-import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
-import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationType.*;
 
 public class ConfigurationsUtils {
 
@@ -92,6 +89,22 @@ public class ConfigurationsUtils {
     writeToZookeeper(PARSER.getZookeeperRoot() + "/" + sensorType, configData, client);
   }
 
+  public static void writeSensorIndexingConfigToZookeeper(String sensorType, Map<String, Object> sensorIndexingConfig, String zookeeperUrl) throws Exception {
+    writeSensorIndexingConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorIndexingConfig), zookeeperUrl);
+  }
+
+  public static void writeSensorIndexingConfigToZookeeper(String sensorType, byte[] configData, String zookeeperUrl) throws Exception {
+    try(CuratorFramework client = getClient(zookeeperUrl)) {
+      client.start();
+      writeSensorIndexingConfigToZookeeper(sensorType, configData, client);
+    }
+  }
+
+  public static void writeSensorIndexingConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
+    INDEXING.deserialize(new String(configData));
+    writeToZookeeper(INDEXING.getZookeeperRoot() + "/" + sensorType, configData, client);
+  }
+
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig, String zookeeperUrl) throws Exception {
     writeSensorEnrichmentConfigToZookeeper(sensorType, JSONUtils.INSTANCE.toJSON(sensorEnrichmentConfig), zookeeperUrl);
   }
@@ -104,10 +117,7 @@ public class ConfigurationsUtils {
   }
 
   public static void writeSensorEnrichmentConfigToZookeeper(String sensorType, byte[] configData, CuratorFramework client) throws Exception {
-    SensorEnrichmentConfig c = (SensorEnrichmentConfig) ENRICHMENT.deserialize(new String(configData));
-    if(c.getIndex() == null ) {
-      throw new IllegalStateException("Attempting to write a malformed sensor config: missing index.\n" + new String(configData));
-    }
+    ENRICHMENT.deserialize(new String(configData));
     writeToZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, configData, client);
   }
 
@@ -142,6 +152,14 @@ public class ConfigurationsUtils {
     }
   }
 
+  public static void updateSensorIndexingConfigsFromZookeeper(IndexingConfigurations configurations, CuratorFramework client) throws Exception {
+    updateConfigsFromZookeeper(configurations, client);
+    List<String> sensorTypes = client.getChildren().forPath(INDEXING.getZookeeperRoot());
+    for(String sensorType: sensorTypes) {
+      configurations.updateSensorIndexingConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client));
+    }
+  }
+
   public static void updateEnrichmentConfigsFromZookeeper(EnrichmentConfigurations configurations, CuratorFramework client) throws Exception {
     updateConfigsFromZookeeper(configurations, client);
     List<String> sensorTypes = client.getChildren().forPath(ENRICHMENT.getZookeeperRoot());
@@ -166,6 +184,10 @@ public class ConfigurationsUtils {
     return readFromZookeeper(PROFILER.getZookeeperRoot(), client);
   }
 
+  public static byte[] readSensorIndexingConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    return readFromZookeeper(INDEXING.getZookeeperRoot() + "/" + sensorType, client);
+  }
+
   public static byte[] readSensorParserConfigBytesFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
     return readFromZookeeper(PARSER.getZookeeperRoot() + "/" + sensorType, client);
   }
@@ -185,21 +207,23 @@ public class ConfigurationsUtils {
   public static void uploadConfigsToZookeeper(String globalConfigPath,
                                               String parsersConfigPath,
                                               String enrichmentsConfigPath,
+                                              String indexingConfigPath,
                                               String profilerConfigPath,
                                               String zookeeperUrl) throws Exception {
     try(CuratorFramework client = getClient(zookeeperUrl)) {
       client.start();
-      uploadConfigsToZookeeper(globalConfigPath, parsersConfigPath, enrichmentsConfigPath, profilerConfigPath, client);
+      uploadConfigsToZookeeper(globalConfigPath, parsersConfigPath, enrichmentsConfigPath, indexingConfigPath, profilerConfigPath, client);
     }
   }
 
   public static void uploadConfigsToZookeeper(String rootFilePath, CuratorFramework client) throws Exception {
-    uploadConfigsToZookeeper(rootFilePath, rootFilePath, rootFilePath, rootFilePath, client);
+    uploadConfigsToZookeeper(rootFilePath, rootFilePath, rootFilePath, rootFilePath, rootFilePath, client);
   }
 
   public static void uploadConfigsToZookeeper(String globalConfigPath,
                                               String parsersConfigPath,
                                               String enrichmentsConfigPath,
+                                              String indexingConfigPath,
                                               String profilerConfigPath,
                                               CuratorFramework client) throws Exception {
 
@@ -219,6 +243,14 @@ public class ConfigurationsUtils {
       }
     }
 
+    // indexing
+    if (indexingConfigPath != null) {
+      Map<String, byte[]> sensorIndexingConfigs = readSensorIndexingConfigsFromFile(indexingConfigPath);
+      for (String sensorType : sensorIndexingConfigs.keySet()) {
+        ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(sensorType, sensorIndexingConfigs.get(sensorType), client);
+      }
+    }
+
     // enrichments
     if (enrichmentsConfigPath != null) {
       Map<String, byte[]> sensorEnrichmentConfigs = readSensorEnrichmentConfigsFromFile(enrichmentsConfigPath);
@@ -253,6 +285,9 @@ public class ConfigurationsUtils {
     return readSensorConfigsFromFile(rootPath, ENRICHMENT);
   }
 
+  public static Map<String, byte[]> readSensorIndexingConfigsFromFile(String rootPath) throws IOException {
+    return readSensorConfigsFromFile(rootPath, INDEXING);
+  }
 
   /**
    * Read the Profiler configuration from a file.  There is only a single profiler configuration.
@@ -291,6 +326,7 @@ public class ConfigurationsUtils {
   public static void visitConfigs(CuratorFramework client, ConfigurationVisitor callback) throws Exception {
     visitConfigs(client, callback, GLOBAL);
     visitConfigs(client, callback, PARSER);
+    visitConfigs(client, callback, INDEXING);
     visitConfigs(client, callback, ENRICHMENT);
     visitConfigs(client, callback, PROFILER);
   }
@@ -303,7 +339,7 @@ public class ConfigurationsUtils {
         byte[] globalConfigData = client.getData().forPath(configType.getZookeeperRoot());
         callback.visit(configType, "global", new String(globalConfigData));
 
-      } else if (configType.equals(PARSER) || configType.equals(ENRICHMENT) || configType.equals(PROFILER)) {
+      } else if (configType.equals(PARSER) || configType.equals(ENRICHMENT) || configType.equals(PROFILER) || configType.equals(INDEXING)) {
         List<String> children = client.getChildren().forPath(configType.getZookeeperRoot());
         for (String child : children) {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
new file mode 100644
index 0000000..fb610c6
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IndexingConfigurations extends Configurations {
+  public static final String BATCH_SIZE_CONF = "batchSize";
+  public static final String INDEX_CONF = "index";
+
+  public Map<String, Object> getSensorIndexingConfig(String sensorType) {
+    Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorType));
+    return ret != null?ret:new HashMap<>();
+  }
+
+  public void updateSensorIndexingConfig(String sensorType, byte[] data) throws IOException {
+    updateSensorIndexingConfig(sensorType, new ByteArrayInputStream(data));
+  }
+
+  public void updateSensorIndexingConfig(String sensorType, InputStream io) throws IOException {
+    Map<String, Object> sensorIndexingConfig = JSONUtils.INSTANCE.load(io, new TypeReference<Map<String, Object>>() {
+    });
+    updateSensorIndexingConfig(sensorType, sensorIndexingConfig);
+  }
+
+  public void updateSensorIndexingConfig(String sensorType, Map<String, Object> sensorIndexingConfig) {
+    configurations.put(getKey(sensorType), sensorIndexingConfig);
+  }
+
+  private String getKey(String sensorType) {
+    return ConfigurationType.INDEXING.getName() + "." + sensorType;
+  }
+
+
+  public int getBatchSize(String sensorName) {
+     return getBatchSize(getSensorIndexingConfig(sensorName));
+  }
+
+  public String getIndex(String sensorName) {
+    return getIndex(getSensorIndexingConfig(sensorName), sensorName);
+  }
+
+  public static int getBatchSize(Map<String, Object> conf) {
+    return getAs( BATCH_SIZE_CONF
+                 ,conf
+                , 1
+                , Integer.class
+                );
+  }
+
+  public static String getIndex(Map<String, Object> conf, String sensorName) {
+    return getAs( INDEX_CONF
+                 ,conf
+                , sensorName
+                , String.class
+                );
+  }
+
+  public static Map<String, Object> setBatchSize(Map<String, Object> conf, int batchSize) {
+    Map<String, Object> ret = conf == null?new HashMap<>():conf;
+    ret.put(BATCH_SIZE_CONF, batchSize);
+    return ret;
+  }
+
+  public static Map<String, Object> setIndex(Map<String, Object> conf, String index) {
+    Map<String, Object> ret = conf == null?new HashMap<>():conf;
+    ret.put(INDEX_CONF, index);
+    return ret;
+  }
+
+  public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
+    return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
index c5538b9..2b4f5a8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentConfig.java
@@ -27,8 +27,6 @@ import java.util.Map;
 
 public class SensorEnrichmentConfig {
 
-  private String index;
-  private int batchSize;
   private EnrichmentConfig enrichment = new EnrichmentConfig();
   private ThreatIntelConfig threatIntel = new ThreatIntelConfig();
   private Map<String, Object> configuration = new HashMap<>();
@@ -57,28 +55,11 @@ public class SensorEnrichmentConfig {
     this.threatIntel = threatIntel;
   }
 
-  public String getIndex() {
-    return index;
-  }
-
-  public void setIndex(String index) {
-    this.index = index;
-  }
-
 
-  public int getBatchSize() {
-    return batchSize;
-  }
-
-  public void setBatchSize(int batchSize) {
-    this.batchSize = batchSize;
-  }
 
   @Override
   public String toString() {
     return "SensorEnrichmentConfig{" +
-            "index='" + index + '\'' +
-            ", batchSize=" + batchSize +
             ", enrichment=" + enrichment +
             ", threatIntel=" + threatIntel +
             ", configuration=" + configuration +
@@ -92,8 +73,6 @@ public class SensorEnrichmentConfig {
 
     SensorEnrichmentConfig that = (SensorEnrichmentConfig) o;
 
-    if (getBatchSize() != that.getBatchSize()) return false;
-    if (getIndex() != null ? !getIndex().equals(that.getIndex()) : that.getIndex() != null) return false;
     if (getEnrichment() != null ? !getEnrichment().equals(that.getEnrichment()) : that.getEnrichment() != null)
       return false;
     if (getThreatIntel() != null ? !getThreatIntel().equals(that.getThreatIntel()) : that.getThreatIntel() != null)
@@ -104,8 +83,7 @@ public class SensorEnrichmentConfig {
 
   @Override
   public int hashCode() {
-    int result = getIndex() != null ? getIndex().hashCode() : 0;
-    result = 31 * result + getBatchSize();
+    int result = getEnrichment() != null ? getEnrichment().hashCode() : 0;
     result = 31 * result + (getEnrichment() != null ? getEnrichment().hashCode() : 0);
     result = 31 * result + (getThreatIntel() != null ? getThreatIntel().hashCode() : 0);
     result = 31 * result + (getConfiguration() != null ? getConfiguration().hashCode() : 0);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
index cff7d03..fd15d31 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/enrichment/SensorEnrichmentUpdateConfig.java
@@ -97,8 +97,6 @@ public class SensorEnrichmentUpdateConfig {
       try {
         sensorEnrichmentConfig = SensorEnrichmentConfig.fromBytes(ConfigurationsUtils.readSensorEnrichmentConfigBytesFromZookeeper(sensor, client));
       }catch (KeeperException.NoNodeException e) {
-        sensorEnrichmentConfig.setIndex(sensor);
-        sensorEnrichmentConfig.setBatchSize(1);
       }
       return sensorEnrichmentConfig;
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
deleted file mode 100644
index 7d65eec..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/EnrichmentWriterConfiguration.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.configuration.writer;
-
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
-
-import java.util.Map;
-
-public class EnrichmentWriterConfiguration implements WriterConfiguration{
-  private EnrichmentConfigurations config;
-
-  public EnrichmentWriterConfiguration(EnrichmentConfigurations config) {
-    this.config = config;
-  }
-
-  @Override
-  public int getBatchSize(String sensorName) {
-    if(config != null && config.getSensorEnrichmentConfig(sensorName) != null) {
-      return config.getSensorEnrichmentConfig(sensorName).getBatchSize();
-    }
-    return 1;
-  }
-
-  @Override
-  public String getIndex(String sensorName) {
-    if(config != null && config.getSensorEnrichmentConfig(sensorName) != null) {
-      return config.getSensorEnrichmentConfig(sensorName).getIndex();
-    }
-    return sensorName;
-  }
-
-  @Override
-  public Map<String, Object> getSensorConfig(String sensorName) {
-    if(config != null && config.getSensorEnrichmentConfig(sensorName) != null) {
-      return config.getSensorEnrichmentConfig(sensorName).getConfiguration();
-    }
-    return null;
-  }
-  @Override
-  public Map<String, Object> getGlobalConfig() {
-    if(config != null) {
-      return config.getGlobalConfig();
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
new file mode 100644
index 0000000..97e7977
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration.writer;
+
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class IndexingWriterConfiguration implements WriterConfiguration{
+  private Optional<IndexingConfigurations> config;
+
+
+  public IndexingWriterConfiguration(IndexingConfigurations config) {
+    this.config = Optional.ofNullable(config);
+  }
+
+
+
+  @Override
+  public int getBatchSize(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).getBatchSize(sensorName);
+  }
+
+  @Override
+  public String getIndex(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).getIndex(sensorName);
+  }
+
+  @Override
+  public Map<String, Object> getSensorConfig(String sensorName) {
+    return config.orElse(new IndexingConfigurations()).getSensorIndexingConfig(sensorName);
+  }
+
+  @Override
+  public Map<String, Object> getGlobalConfig() {
+    return config.orElse(new IndexingConfigurations()).getGlobalConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
index 2a4ac0a..68ef604 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBoltTest.java
@@ -117,8 +117,6 @@ public class ConfiguredEnrichmentBoltTest extends BaseConfiguredBoltTest {
     configsUpdated = new HashSet<>();
     String sensorType = "testSensorConfig";
     SensorEnrichmentConfig testSensorConfig = new SensorEnrichmentConfig();
-    testSensorConfig.setBatchSize(50);
-    testSensorConfig.setIndex("test");
     Map<String, Object> enrichmentFieldMap = new HashMap<>();
     enrichmentFieldMap.put("enrichmentTest", new ArrayList<String>() {{
       add("enrichmentField");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
index f907144..01a697b 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/SensorEnrichmentUpdateConfigTest.java
@@ -32,8 +32,6 @@ import java.util.Map;
 public class SensorEnrichmentUpdateConfigTest {
   /**
    {
-      "index": "bro",
-      "batchSize": 5,
       "enrichment" : {
         "fieldMap": {
           "geo": ["ip_dst_addr", "ip_src_addr"],

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/EnrichmentWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/EnrichmentWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/EnrichmentWriterConfigurationTest.java
deleted file mode 100644
index a0a59c1..0000000
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/EnrichmentWriterConfigurationTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.common.writer;
-
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
-import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class EnrichmentWriterConfigurationTest {
-  @Test
-  public void testDefaultBatchSize() {
-    EnrichmentWriterConfiguration config = new EnrichmentWriterConfiguration(
-           new EnrichmentConfigurations()
-    );
-    Assert.assertEquals(1, config.getBatchSize("foo"));
-  }
-  @Test
-  public void testDefaultIndex() {
-    EnrichmentWriterConfiguration config = new EnrichmentWriterConfiguration(
-           new EnrichmentConfigurations()
-    );
-    Assert.assertEquals("foo", config.getIndex("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
new file mode 100644
index 0000000..2e66818
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/writer/IndexingWriterConfigurationTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.writer;
+
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IndexingWriterConfigurationTest {
+  @Test
+  public void testDefaultBatchSize() {
+    IndexingWriterConfiguration config = new IndexingWriterConfiguration(
+           new IndexingConfigurations()
+    );
+    Assert.assertEquals(1, config.getBatchSize("foo"));
+  }
+  @Test
+  public void testDefaultIndex() {
+    IndexingWriterConfiguration config = new IndexingWriterConfiguration(
+           new IndexingConfigurations()
+    );
+    Assert.assertEquals("foo", config.getIndex("foo"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/README.md b/metron-platform/metron-enrichment/README.md
index 9f96f3e..ba518cb 100644
--- a/metron-platform/metron-enrichment/README.md
+++ b/metron-platform/metron-enrichment/README.md
@@ -38,8 +38,6 @@ sensor type (e.g. `snort`).
 Just like the global config, the format is a JSON stored in zookeeper.
 The configuration is a complex JSON object with the following top level fields:
 
-* `index` : The name of the sensor
-* `batchSize` : The size of the batch that is written to the indices at once.
 * `enrichment` : A complex JSON object representing the configuration of the enrichments
 * `threatIntel` : A complex JSON object representing the configuration of the threat intelligence enrichments
 
@@ -139,8 +137,6 @@ The supported aggregation functions are:
 An example configuration for the YAF sensor is as follows:
 ```json
 {
-  "index": "yaf",
-  "batchSize": 5,
   "enrichment": {
     "fieldMap": {
       "geo": [
@@ -210,8 +206,6 @@ Let's adjust the configurations for the Squid topology to annotate the messages
 
  ```
 {
-  "index": "squid",
-  "batchSize": 1,
   "enrichment" : {
     "fieldMap": {
       "stellar" : {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/asa.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/asa.json b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/asa.json
index 546923a..fa07468 100644
--- a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/asa.json
+++ b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/asa.json
@@ -1,6 +1,4 @@
 {
-    "index": "asa",
-    "batchSize": 5,
     "enrichment" : {
         "fieldMap": {
             "geo": ["ip_dst_addr", "ip_src_addr"]

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/bro.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/bro.json b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/bro.json
index 0eb34b3..57d0365 100644
--- a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/bro.json
+++ b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/bro.json
@@ -1,6 +1,4 @@
 {
-  "index": "bro",
-  "batchSize": 5,
   "enrichment" : {
     "fieldMap": {
       "geo": ["ip_dst_addr", "ip_src_addr"],

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/snort.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/snort.json b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/snort.json
index 9dfc80e..5bf49d7 100644
--- a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/snort.json
+++ b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/snort.json
@@ -1,6 +1,4 @@
 {
-  "index": "snort",
-  "batchSize": 1,
   "enrichment" : {
     "fieldMap":
       {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/websphere.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/websphere.json b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/websphere.json
index b765808..149957b 100644
--- a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/websphere.json
+++ b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/websphere.json
@@ -1,6 +1,4 @@
 {
-  "index": "websphere",
-  "batchSize": 5,
   "enrichment": {
     "fieldMap": {
       "geo": [

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/yaf.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/yaf.json b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/yaf.json
index 4e67748..d02cb85 100644
--- a/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/yaf.json
+++ b/metron-platform/metron-enrichment/src/main/config/zookeeper/enrichments/yaf.json
@@ -1,6 +1,4 @@
 {
-  "index": "yaf",
-  "batchSize": 5,
   "enrichment" : {
     "fieldMap":
       {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
index 98fbc89..7999a4c 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
@@ -67,8 +67,6 @@ public class SimpleHBaseAdapterTest {
 
   /**
     {
-      "index": "bro",
-      "batchSize": 5,
       "enrichment": {
         "fieldMap": {
            "hbaseEnrichment" : [ "ip_dst_addr" ]
@@ -83,8 +81,6 @@ public class SimpleHBaseAdapterTest {
   private String sourceConfigStr;
   /**
     {
-      "index": "bro",
-      "batchSize": 5,
       "enrichment": {
         "fieldMap": {
            "hbaseEnrichment" : [ "ip_dst_addr" ]

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
index af7102a..966538f 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
@@ -73,8 +73,6 @@ public class ThreatIntelAdapterTest {
 
   /**
     {
-      "index": "bro",
-      "batchSize": 5,
       "enrichment": {
         "fieldMap": {
           "geo": ["ip_dst_addr", "ip_src_addr"],

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index d6d9eca..3a4f5b0 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -117,7 +117,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
     BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl").withBulkMessageWriter(bulkMessageWriter);
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setTreeCache(cache);
-    bulkMessageWriterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath));
+    bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath));
     bulkMessageWriterBolt.declareOutputFields(declarer);
     verify(declarer, times(1)).declareStream(eq("error"), argThat(new FieldsMatcher("message")));
     Map stormConf = new HashMap();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
index c2ad285..a819105 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -36,6 +36,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
   private String globalConfigPath;
   private String parserConfigsPath;
   private String enrichmentConfigsPath;
+  private String indexingConfigsPath;
   private String profilerConfigPath;
   private Optional<String> globalConfig = Optional.empty();
   private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
@@ -58,6 +59,10 @@ public class ConfigUploadComponent implements InMemoryComponent {
     return this;
   }
 
+  public ConfigUploadComponent withIndexingConfigsPath(String indexingConfigsPath) {
+    this.indexingConfigsPath = indexingConfigsPath;
+    return this;
+  }
   public ConfigUploadComponent withProfilerConfigsPath(String profilerConfigsPath) {
     this.profilerConfigPath = profilerConfigsPath;
     return this;
@@ -81,9 +86,10 @@ public class ConfigUploadComponent implements InMemoryComponent {
       if(globalConfigPath != null
       || parserConfigsPath != null
       || enrichmentConfigsPath != null
+      || indexingConfigsPath != null
       || profilerConfigPath != null
         ) {
-        uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, profilerConfigPath, zookeeperUrl);
+        uploadConfigsToZookeeper(globalConfigPath, parserConfigsPath, enrichmentConfigsPath, indexingConfigsPath, profilerConfigPath, zookeeperUrl);
       }
 
       for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
index 40dcfe4..bffae1c 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/utils/SampleUtil.java
@@ -18,10 +18,7 @@
 package org.apache.metron.enrichment.integration.utils;
 
 import org.apache.metron.TestConstants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
-import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.*;
 
 import java.io.IOException;
 import java.util.Map;
@@ -54,4 +51,13 @@ public class SampleUtil {
     return configurations;
   }
 
+  public static IndexingConfigurations getSampleIndexingConfigs() throws IOException {
+    IndexingConfigurations configurations = new IndexingConfigurations();
+    configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(TestConstants.SAMPLE_CONFIG_PATH));
+    Map<String, byte[]> sensorIndexingConfigs = ConfigurationsUtils.readSensorIndexingConfigsFromFile(TestConstants.SAMPLE_CONFIG_PATH);
+    for(String sensorType: sensorIndexingConfigs.keySet()) {
+      configurations.updateSensorIndexingConfig(sensorType, sensorIndexingConfigs.get(sensorType));
+    }
+    return configurations;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index 6a5d34d..63ad3a6 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -12,7 +12,7 @@ By default, this topology writes out to both HDFS and one of
 Elasticsearch and Solr.
 
 Indices are written in batch and the batch size is specified in the
-[Enrichment Config](../metron-enrichment) via the `batchSize` parameter.
+[Indexing Config](../metron-enrichment) via the `batchSize` parameter.
 This config is variable by sensor type.
 
 ## Indexing Architecture
@@ -26,6 +26,17 @@ and sent to
 
 Errors during indexing are sent to a kafka queue called `index_errors`
 
+##Sensor Indexing Configuration
+The sensor specific configuration is intended to configure the
+indexing used for a given sensor type (e.g. `snort`).
+
+Just like the global config, the format is a JSON stored in zookeeper.
+The configuration is a JSON map with the following fields:
+* `index` : The name of the index to write to (defaulted to the name of the sensor).
+* `batchSize` : The size of the batch that is written to the indices at once (defaulted to 1.
+
+
+
 # Notes on Performance Tuning
 
 Default installed Metron is untuned for production deployment.  By far

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
new file mode 100644
index 0000000..afc500c
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/asa.json
@@ -0,0 +1,5 @@
+{
+    "index": "asa",
+    "batchSize": 5
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
new file mode 100644
index 0000000..b559ca9
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/bro.json
@@ -0,0 +1,5 @@
+{
+  "index": "bro",
+  "batchSize": 5
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
new file mode 100644
index 0000000..3c37faa
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/snort.json
@@ -0,0 +1,4 @@
+{
+  "index": "snort",
+  "batchSize": 1
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
new file mode 100644
index 0000000..4b69f5b
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/websphere.json
@@ -0,0 +1,5 @@
+{
+  "index": "websphere",
+  "batchSize": 5
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
new file mode 100644
index 0000000..b248524
--- /dev/null
+++ b/metron-platform/metron-indexing/src/main/config/zookeeper/indexing/yaf.json
@@ -0,0 +1,4 @@
+{
+  "index": "yaf",
+  "batchSize": 5
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 8fdc4dd..f54d791 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -142,7 +142,9 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
             .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
+            .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            .withIndexingConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
+            ;
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(fluxPath))
             .withTopologyName("test")
@@ -158,6 +160,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
             .withComponent("search", getSearchComponent(topologyProperties))
             .withMillisecondsBetweenAttempts(15000)
             .withNumRetries(10)
+            .withMaxTimeMS(150000)
             .withCustomShutdownOrder(new String[] {"search","storm","config","kafka","zk"})
             .build();
     runner.start();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
index c053cdc..d99f741 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/enrichments/test.json
@@ -1,6 +1,4 @@
 {
-  "index": "yaf",
-  "batchSize": 5,
   "enrichment": {
     "fieldMap": {
       "geo": [

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
new file mode 100644
index 0000000..239484a
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/indexing/test.json
@@ -0,0 +1,5 @@
+{
+  "index": "yaf",
+  "batchSize": 5
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/README.md b/metron-platform/metron-management/README.md
index 624a9c0..aee0de8 100644
--- a/metron-platform/metron-management/README.md
+++ b/metron-platform/metron-management/README.md
@@ -133,14 +133,14 @@ The functions are split roughly into a few sections:
 * `CONFIG_GET`
   * Description: Retrieve a Metron configuration from zookeeper.
   * Input:
-    * type - One of ENRICHMENT, PARSER, GLOBAL, PROFILER
+    * type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER
     * sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)
     * emptyIfNotPresent - If true, then return an empty, minimally viable config
   * Returns: The String representation of the config in zookeeper
 * `CONFIG_PUT`
   * Description: Updates a Metron config to Zookeeper.
   * Input:
-    * type - One of ENRICHMENT, PARSER, GLOBAL, PROFILER
+    * type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER
     * config - The config (a string in JSON form) to update
     * sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)
   * Returns: The String representation of the config in zookeeper
@@ -168,14 +168,14 @@ The functions are split roughly into a few sections:
 
 ### Enrichment Functions
 
-* `ENRICHMENT_SET_BATCH`
+* `INDEXING_SET_BATCH`
   * Description: Set batch size
   * Input:
     * sensorConfig - Sensor config to add transformation to.
     * size - batch size (integer)
   * Returns: The String representation of the config in zookeeper
-* `ENRICHMENT_SET_INDEX`
-  * Description: Set the index for the enrichment
+* `INDEXING_SET_INDEX`
+  * Description: Set the index for the sensor
   * Input:
     * sensorConfig - Sensor config to add transformation to.
     * sensor - sensor name
@@ -541,7 +541,7 @@ Functions loaded, you may refer to functions now...
 }
 [Stellar]>>> # Wait, that batch size looks terrible.  That is because it did not exist in zookeeper, so it is the default.
 [Stellar]>>> # We can correct it, thankfully. 
-[Stellar]>>> squid_enrichment_config := ENRICHMENT_SET_BATCH( squid_enrichment_config, 100)
+[Stellar]>>> squid_enrichment_config := INDEXING_SET_BATCH( squid_enrichment_config, 100)
 [Stellar]>>> # Now that we have a config, we can add an enrichment to the Stellar adapter
 [Stellar]>>> # We should make sure that the current enrichment does not have any already
 [Stellar]>>> ?ENRICHMENT_STELLAR_TRANSFORM_PRINT
@@ -757,7 +757,7 @@ Please note that functions are loading lazily in the background and will be unav
 26828 [Thread-1] INFO  o.a.m.c.d.FunctionResolverSingleton - Found 84 Stellar Functions...
 Functions loaded, you may refer to functions now...
 [Stellar]>>> # Just as in the previous example, we should adjust the batch size
-[Stellar]>>> squid_enrichment_config := ENRICHMENT_SET_BATCH( squid_enrichment_config, 100)
+[Stellar]>>> squid_enrichment_config := INDEXING_SET_BATCH( squid_enrichment_config, 100)
 [Stellar]>>> # We should not have any threat triage rules
 [Stellar]>>> THREAT_TRIAGE_PRINT(squid_enrichment_config)
 ╔═════════════╤═══════╗

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
index ea9f93a..ecebe8b 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
@@ -31,6 +31,7 @@ import org.apache.log4j.Logger;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.dsl.Context;
@@ -84,6 +85,9 @@ public class ConfigurationFunctions {
           } else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
             Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
             sensorMap.put(sensor, new String(data));
+          } else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
+            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
+            sensorMap.put(sensor, new String(data));
           }
         }
         else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
@@ -97,6 +101,10 @@ public class ConfigurationFunctions {
             Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
             sensorMap.remove(sensor);
           }
+          else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
+            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
+            sensorMap.remove(sensor);
+          }
           else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
             configMap.put(ConfigurationType.PROFILER, null);
           }
@@ -124,6 +132,7 @@ public class ConfigurationFunctions {
             configMap.put(ct, data);
           }
           break;
+        case INDEXING:
         case ENRICHMENT:
         case PARSER:
           {
@@ -143,7 +152,7 @@ public class ConfigurationFunctions {
            namespace = "CONFIG"
           ,name = "GET"
           ,description = "Retrieve a Metron configuration from zookeeper."
-          ,params = {"type - One of ENRICHMENT, PARSER, GLOBAL, PROFILER"
+          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
                     , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
                     , "emptyIfNotPresent - If true, then return an empty, minimally viable config"
                     }
@@ -179,6 +188,25 @@ public class ConfigurationFunctions {
           }
           return ret;
         }
+        case INDEXING: {
+          String sensor = (String) args.get(1);
+          if(args.size() > 2) {
+            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
+          }
+          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
+          String ret = sensorMap.get(sensor);
+          if (ret == null && emptyIfNotPresent ) {
+            Map<String, Object> config = new HashMap<>();
+            try {
+              ret = JSONUtils.INSTANCE.toJSON(config, true);
+              IndexingConfigurations.setIndex(config, sensor);
+            } catch (JsonProcessingException e) {
+              LOG.error("Unable to serialize default object: " + e.getMessage(), e);
+              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
+            }
+          }
+          return ret;
+        }
         case ENRICHMENT: {
           String sensor = (String) args.get(1);
           if(args.size() > 2) {
@@ -188,7 +216,6 @@ public class ConfigurationFunctions {
           String ret = sensorMap.get(sensor);
           if (ret == null && emptyIfNotPresent ) {
             SensorEnrichmentConfig config = new SensorEnrichmentConfig();
-            config.setIndex(sensor);
             try {
               ret = JSONUtils.INSTANCE.toJSON(config, true);
             } catch (JsonProcessingException e) {
@@ -224,7 +251,7 @@ public class ConfigurationFunctions {
            namespace = "CONFIG"
           ,name = "PUT"
           ,description = "Updates a Metron config to Zookeeper."
-          ,params = {"type - One of ENRICHMENT, PARSER, GLOBAL, PROFILER"
+          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
                     ,"config - The config (a string in JSON form) to update"
                     , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
                     }
@@ -258,6 +285,15 @@ public class ConfigurationFunctions {
             ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.getBytes(), client);
           }
           break;
+          case INDEXING:
+          {
+            String sensor = (String) args.get(2);
+            if(sensor == null) {
+              return null;
+            }
+            ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(sensor, config.getBytes(), client);
+          }
+          break;
           case PARSER:
             {
             String sensor = (String) args.get(2);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/src/main/java/org/apache/metron/management/EnrichmentConfigFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/EnrichmentConfigFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/EnrichmentConfigFunctions.java
index b1566ca..29f950e 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/EnrichmentConfigFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/EnrichmentConfigFunctions.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.jakewharton.fliptables.FlipTable;
 import org.apache.log4j.Logger;
 import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.dsl.Context;
@@ -33,10 +34,11 @@ import org.apache.metron.common.utils.JSONUtils;
 import java.util.*;
 
 import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
+import static org.apache.metron.common.configuration.ConfigurationType.INDEXING;
 
 public class EnrichmentConfigFunctions {
 
-  private static final Logger LOG = Logger.getLogger(ConfigurationFunctions.class);
+  private static final Logger LOG = Logger.getLogger(EnrichmentConfigFunctions.class);
   public enum Type {
     ENRICHMENT, THREAT_INTEL, THREATINTEL;
   }
@@ -251,95 +253,5 @@ public class EnrichmentConfigFunctions {
     }
   }
 
-  @Stellar(
-           namespace = "ENRICHMENT"
-          ,name = "SET_BATCH"
-          ,description = "Set batch size"
-          ,params = {"sensorConfig - Sensor config to add transformation to."
-                    ,"size - batch size (integer)"
-                    }
-          ,returns = "The String representation of the config in zookeeper"
-          )
-  public static class SetBatchSize implements StellarFunction{
-
-    @Override
-    public Object apply(List<Object> args, Context context) throws ParseException {
-      int i = 0;
-      String config = (String) args.get(i++);
-      SensorEnrichmentConfig configObj;
-      if(config == null || config.isEmpty()) {
-        throw new IllegalStateException("Invalid config: " + config);
-      }
-      else {
-        configObj = (SensorEnrichmentConfig) ENRICHMENT.deserialize(config);
-      }
-      int batchSize = 5;
-      if(args.size() > 1) {
-        batchSize = ConversionUtils.convert(args.get(i++), Integer.class);
-      }
-      configObj.setBatchSize(batchSize);
-      try {
-        return JSONUtils.INSTANCE.toJSON(configObj, true);
-      } catch (JsonProcessingException e) {
-        LOG.error("Unable to convert object to JSON: " + configObj, e);
-        return config;
-      }
-    }
-
-    @Override
-    public void initialize(Context context) {
-
-    }
-
-    @Override
-    public boolean isInitialized() {
-      return true;
-    }
-  }
 
-  @Stellar(
-           namespace = "ENRICHMENT"
-          ,name = "SET_INDEX"
-          ,description = "Set the index for the enrichment"
-          ,params = {"sensorConfig - Sensor config to add transformation to."
-                    ,"sensor - sensor name"
-                    }
-          ,returns = "The String representation of the config in zookeeper"
-          )
-  public static class SetIndex implements StellarFunction{
-
-    @Override
-    public Object apply(List<Object> args, Context context) throws ParseException {
-      int i = 0;
-      String config = (String) args.get(i++);
-      SensorEnrichmentConfig configObj;
-      if(config == null || config.isEmpty()) {
-        throw new IllegalStateException("Invalid config: " + config);
-      }
-      else {
-        configObj = (SensorEnrichmentConfig) ENRICHMENT.deserialize(config);
-      }
-      String sensorName = ConversionUtils.convert(args.get(i++), String.class);
-      if(sensorName == null) {
-        throw new IllegalStateException("Invalid sensor name: " + config);
-      }
-      configObj.setIndex(sensorName);
-      try {
-        return JSONUtils.INSTANCE.toJSON(configObj, true);
-      } catch (JsonProcessingException e) {
-        LOG.error("Unable to convert object to JSON: " + configObj, e);
-        return config;
-      }
-    }
-
-    @Override
-    public void initialize(Context context) {
-
-    }
-
-    @Override
-    public boolean isInitialized() {
-      return true;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
new file mode 100644
index 0000000..f572cbf
--- /dev/null
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/IndexingConfigFunctions.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.management;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.metron.common.configuration.ConfigurationType.INDEXING;
+
+
+public class IndexingConfigFunctions {
+  private static final Logger LOG = Logger.getLogger(IndexingConfigFunctions.class);
+  @Stellar(
+           namespace = "INDEXING"
+          ,name = "SET_BATCH"
+          ,description = "Set batch size"
+          ,params = {"sensorConfig - Sensor config to add transformation to."
+                    ,"size - batch size (integer)"
+                    }
+          ,returns = "The String representation of the config in zookeeper"
+          )
+  public static class SetBatchSize implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      int i = 0;
+      String config = (String) args.get(i++);
+      Map<String, Object> configObj;
+      if(config == null || config.isEmpty()) {
+        throw new IllegalStateException("Invalid config: " + config);
+      }
+      else {
+        configObj = (Map<String, Object>) INDEXING.deserialize(config);
+      }
+      int batchSize = 5;
+      if(args.size() > 1) {
+        batchSize = ConversionUtils.convert(args.get(i++), Integer.class);
+      }
+      configObj = IndexingConfigurations.setBatchSize(configObj, batchSize);
+      try {
+        return JSONUtils.INSTANCE.toJSON(configObj, true);
+      } catch (JsonProcessingException e) {
+        LOG.error("Unable to convert object to JSON: " + configObj, e);
+        return config;
+      }
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(
+           namespace = "INDEXING"
+          ,name = "SET_INDEX"
+          ,description = "Set the index for the sensor"
+          ,params = {"sensorConfig - Sensor config to add transformation to."
+                    ,"sensor - sensor name"
+                    }
+          ,returns = "The String representation of the config in zookeeper"
+          )
+  public static class SetIndex implements StellarFunction{
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws ParseException {
+      int i = 0;
+      String config = (String) args.get(i++);
+      Map<String, Object> configObj;
+      if(config == null || config.isEmpty()) {
+        throw new IllegalStateException("Invalid config: " + config);
+      }
+      else {
+        configObj = (Map<String, Object>) INDEXING.deserialize(config);
+      }
+      String sensorName = ConversionUtils.convert(args.get(i++), String.class);
+      if(sensorName == null) {
+        throw new IllegalStateException("Invalid sensor name: " + config);
+      }
+      configObj = IndexingConfigurations.setIndex(configObj, sensorName);
+      try {
+        return JSONUtils.INSTANCE.toJSON(configObj, true);
+      } catch (JsonProcessingException e) {
+        LOG.error("Unable to convert object to JSON: " + configObj, e);
+        return config;
+      }
+    }
+
+    @Override
+    public void initialize(Context context) {
+
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 8d202c6..794f208 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -121,8 +121,6 @@ public class ConfigurationFunctionsTest {
 
   /**
     {
-      "index" : "brop",
-      "batchSize" : 0,
       "enrichment" : {
         "fieldMap" : { },
         "fieldToTypeMap" : { },
@@ -207,6 +205,37 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
+  public void testIndexingPut() throws InterruptedException {
+    String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", new HashMap<>(), context);
+    run("CONFIG_PUT('INDEXING', config, 'testIndexingPut')", ImmutableMap.of("config", brop), context);
+    boolean foundMatch = false;
+    for(int i = 0;i < 10 && !foundMatch;++i) {
+      String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", new HashMap<>(), context);
+      foundMatch =  brop.equals(bropNew);
+      if(foundMatch) {
+        break;
+      }
+      Thread.sleep(2000);
+    }
+    Assert.assertTrue(foundMatch);
+  }
+
+  @Test(expected= ParseException.class)
+  public void testIndexingPutBad() throws InterruptedException {
+    {
+      {
+        UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
+        try {
+          run("CONFIG_PUT('INDEXING', config, 'brop')", ImmutableMap.of("config", "foo bar"), context);
+        } catch(ParseException e) {
+          UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.ERROR);
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Test
   public void testEnrichmentPut() throws InterruptedException {
     String brop= (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut')", new HashMap<>(), context);
     run("CONFIG_PUT('ENRICHMENT', config, 'testEnrichmentPut')", ImmutableMap.of("config", brop), context);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/src/test/java/org/apache/metron/management/EnrichmentConfigFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/EnrichmentConfigFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/EnrichmentConfigFunctionsTest.java
index 21eba0e..7d8e20a 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/EnrichmentConfigFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/EnrichmentConfigFunctionsTest.java
@@ -55,8 +55,6 @@ public class EnrichmentConfigFunctionsTest {
 
   public static String emptyTransformationsConfig() {
     SensorEnrichmentConfig config = new SensorEnrichmentConfig();
-    config.setIndex("dummy");
-    config.setBatchSize(5);
     try {
       return JSONUtils.INSTANCE.toJSON(config, true);
     } catch (JsonProcessingException e) {
@@ -346,39 +344,5 @@ public class EnrichmentConfigFunctionsTest {
     Assert.assertEquals(testPrintEmptyExpected, out);
   }
 
-  @Test
-  public void testSetBatch() {
-    String out = (String) run("ENRICHMENT_SET_BATCH(config, 10)"
-                             , toMap("config", configStr)
-    );
-    SensorEnrichmentConfig config = (SensorEnrichmentConfig)ENRICHMENT.deserialize(out);
-    Assert.assertEquals(config.getBatchSize(), 10);
-  }
-
-  @Test(expected=ParseException.class)
-  public void testSetBatchBad() {
-    String out = (String) run("ENRICHMENT_SET_BATCH(config, 10)"
-                             , new HashMap<>()
-    );
-    SensorEnrichmentConfig config = (SensorEnrichmentConfig)ENRICHMENT.deserialize(out);
-    Assert.assertEquals(config.getBatchSize(), 10);
-  }
 
-  @Test
-  public void testSetIndex() {
-    String out = (String) run("ENRICHMENT_SET_INDEX(config, 'foo')"
-            , toMap("config", configStr)
-    );
-    SensorEnrichmentConfig config = (SensorEnrichmentConfig)ENRICHMENT.deserialize(out);
-    Assert.assertEquals("foo", config.getIndex());
-  }
-
-  @Test(expected= ParseException.class)
-  public void testSetIndexBad() {
-    String out = (String) run("ENRICHMENT_SET_INDEX(config, NULL)"
-            , new HashMap<>()
-    );
-    SensorEnrichmentConfig config = (SensorEnrichmentConfig)ENRICHMENT.deserialize(out);
-    Assert.assertNotNull(config.getIndex());
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/368e7ad6/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
new file mode 100644
index 0000000..18132ed
--- /dev/null
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/IndexingConfigFunctionsTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.management;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.stellar.shell.StellarExecutor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.metron.common.configuration.ConfigurationType.INDEXING;
+import static org.apache.metron.management.EnrichmentConfigFunctionsTest.toMap;
+
+public class IndexingConfigFunctionsTest {
+
+  Map<String, StellarExecutor.VariableResult> variables;
+  Context context = null;
+
+  private Object run(String rule, Map<String, Object> variables) {
+    StellarProcessor processor = new StellarProcessor();
+    return processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+  }
+
+  @Before
+  public void setup() {
+    variables = ImmutableMap.of(
+            "upper", new StellarExecutor.VariableResult("TO_UPPER('foo')", "FOO"),
+            "lower", new StellarExecutor.VariableResult("TO_LOWER('FOO')", "foo")
+    );
+
+    context = new Context.Builder()
+            .with(StellarExecutor.SHELL_VARIABLES, () -> variables)
+            .build();
+  }
+
+  @Test
+  public void testSetBatch() {
+    String out = (String) run("INDEXING_SET_BATCH(config, 10)"
+                             , toMap("config", "{}")
+    );
+    Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out);
+    Assert.assertEquals(IndexingConfigurations.getBatchSize(config), 10);
+  }
+
+  @Test(expected=ParseException.class)
+  public void testSetBatchBad() {
+    run("INDEXING_SET_BATCH(config, 10)"
+                             , new HashMap<>()
+    );
+  }
+
+  @Test
+  public void testSetIndex() {
+    String out = (String) run("INDEXING_SET_INDEX(config, 'foo')"
+            , toMap("config", "{}")
+    );
+    Map<String, Object> config = (Map<String, Object>)INDEXING.deserialize(out);
+    Assert.assertEquals("foo", IndexingConfigurations.getIndex(config, null));
+  }
+
+  @Test(expected= ParseException.class)
+  public void testSetIndexBad() {
+    run("INDEXING_SET_INDEX(config, NULL)"
+            , new HashMap<>()
+    );
+  }
+}


Mime
View raw message