metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [3/3] metron git commit: METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795
Date Fri, 20 Oct 2017 21:20:21 GMT
METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cc111ec9
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cc111ec9
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cc111ec9

Branch: refs/heads/master
Commit: cc111ec984a78db43c4df222851f59280ff5eff9
Parents: aee0184
Author: cstella <cestella@gmail.com>
Authored: Fri Oct 20 17:20:06 2017 -0400
Committer: cstella <cestella@gmail.com>
Committed: Fri Oct 20 17:20:06 2017 -0400

----------------------------------------------------------------------
 .../profiler/bolt/ProfileBuilderBoltTest.java   |   2 +-
 .../profiler/bolt/ProfileSplitterBoltTest.java  |   2 +-
 .../metron/rest/config/ZookeeperConfig.java     |  11 +
 .../service/impl/GlobalConfigServiceImpl.java   |  33 ++-
 .../impl/SensorEnrichmentConfigServiceImpl.java |  43 ++-
 .../impl/SensorIndexingConfigServiceImpl.java   |  40 +--
 .../impl/SensorParserConfigServiceImpl.java     |  38 ++-
 .../apache/metron/rest/config/TestConfig.java   |  11 +
 .../GlobalConfigControllerIntegrationTest.java  |   6 +-
 ...richmentConfigControllerIntegrationTest.java |   6 +-
 ...IndexingConfigControllerIntegrationTest.java |   6 +-
 ...orParserConfigControllerIntegrationTest.java |  19 +-
 .../StormControllerIntegrationTest.java         |  12 +
 .../impl/GlobalConfigServiceImplTest.java       |  30 +-
 .../SensorEnrichmentConfigServiceImplTest.java  |  99 +++----
 .../SensorIndexingConfigServiceImplTest.java    | 100 +++----
 .../impl/SensorParserConfigServiceImplTest.java | 105 +++----
 metron-platform/metron-common/pom.xml           |  10 +-
 .../metron/common/bolt/ConfiguredBolt.java      |  54 ++--
 .../common/bolt/ConfiguredEnrichmentBolt.java   |  30 +-
 .../common/bolt/ConfiguredIndexingBolt.java     |  28 +-
 .../common/bolt/ConfiguredParserBolt.java       |  30 +-
 .../common/bolt/ConfiguredProfilerBolt.java     |  47 +--
 .../common/configuration/Configurations.java    |  27 +-
 .../configuration/ConfigurationsUtils.java      |  64 +++-
 .../configuration/EnrichmentConfigurations.java |  46 ++-
 .../configuration/IndexingConfigurations.java   |  38 ++-
 .../configuration/ParserConfigurations.java     |  22 +-
 .../configuration/profiler/ProfileResult.java   |   8 +
 .../profiler/ProfileResultExpressions.java      |   7 +
 .../profiler/ProfileTriageExpressions.java      |  23 ++
 .../configuration/profiler/ProfilerConfig.java  |   7 +
 .../profiler/ProfilerConfigurations.java        |  11 +-
 .../common/zookeeper/ConfigurationsCache.java   |  44 +++
 .../common/zookeeper/ZKConfigurationsCache.java | 179 +++++++++++
 .../configurations/ConfigurationsUpdater.java   | 152 ++++++++++
 .../configurations/EnrichmentUpdater.java       |  78 +++++
 .../configurations/IndexingUpdater.java         |  74 +++++
 .../zookeeper/configurations/ParserUpdater.java |  74 +++++
 .../configurations/ProfilerUpdater.java         |  96 ++++++
 .../zookeeper/configurations/Reloadable.java    |  27 ++
 .../metron-common/src/main/scripts/stellar      |   2 +-
 .../ZKConfigurationsCacheIntegrationTest.java   | 296 +++++++++++++++++++
 .../bolt/BulkMessageWriterBoltTest.java         |   6 +-
 .../enrichment/bolt/EnrichmentJoinBoltTest.java |   2 +-
 .../bolt/EnrichmentSplitterBoltTest.java        |   2 +-
 .../bolt/GenericEnrichmentBoltTest.java         |   2 +-
 .../metron/enrichment/bolt/JoinBoltTest.java    |   2 +-
 .../metron/enrichment/bolt/SplitBoltTest.java   |   2 +-
 .../bolt/ThreatIntelJoinBoltTest.java           |   2 +-
 .../bolt/ThreatIntelSplitterBoltTest.java       |   2 +-
 .../metron/integration/utils/TestUtils.java     |  22 ++
 .../metron/parsers/bolt/ParserBoltTest.java     | 176 +++++------
 metron-platform/metron-test-utilities/pom.xml   |  11 +-
 .../apache/metron/test/bolt/BaseBoltTest.java   |   3 +-
 metron-platform/metron-zookeeper/pom.xml        |  48 +++
 .../metron/zookeeper/SimpleEventListener.java   | 123 ++++++++
 .../org/apache/metron/zookeeper/ZKCache.java    | 196 ++++++++++++
 metron-platform/pom.xml                         |   1 +
 .../stellar-common/src/main/scripts/stellar     |   2 +-
 60 files changed, 2027 insertions(+), 612 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 62be86e..21d61ab 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -147,7 +147,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
 
     ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL");
     bolt.setCuratorFramework(client);
-    bolt.setTreeCache(cache);
+    bolt.setZKCache(cache);
     bolt.withPeriodDuration(10, TimeUnit.MINUTES);
     bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index d51401f..beab8d5 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -140,7 +140,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
 
     ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
     bolt.setCuratorFramework(client);
-    bolt.setTreeCache(cache);
+    bolt.setZKCache(cache);
     bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8"));
     bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
index 1f72afb..6f4656e 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java
@@ -24,6 +24,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.metron.rest.MetronRestConstants;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -37,6 +39,15 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 public class ZookeeperConfig {
 
   @Bean(initMethod = "start", destroyMethod="close")
+  public ConfigurationsCache cache(CuratorFramework client) {
+    return new ZKConfigurationsCache( client
+                                    , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
+                                    , ZKConfigurationsCache.ConfiguredTypes.PARSER
+                                    , ZKConfigurationsCache.ConfiguredTypes.INDEXING
+                                    );
+  }
+
+  @Bean(initMethod = "start", destroyMethod="close")
   public CuratorFramework client(Environment environment) {
     int sleepTime = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_SLEEP_TIME));
     int maxRetries = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_MAX_RETRIES));

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
index e80380b..ed67994 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
@@ -17,27 +17,34 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.ByteArrayInputStream;
 import java.util.Map;
 
 @Service
 public class GlobalConfigServiceImpl implements GlobalConfigService {
     private CuratorFramework client;
 
+    private ConfigurationsCache cache;
+
     @Autowired
-    public GlobalConfigServiceImpl(CuratorFramework client) {
+    public GlobalConfigServiceImpl(CuratorFramework client, ConfigurationsCache cache) {
       this.client = client;
+      this.cache = cache;
+    }
+
+    public void setCache(ConfigurationsCache cache) {
+      this.cache = cache;
     }
 
     @Override
@@ -52,16 +59,14 @@ public class GlobalConfigServiceImpl implements GlobalConfigService {
 
     @Override
     public Map<String, Object> get() throws RestException {
-        Map<String, Object> globalConfig;
-        try {
-            byte[] globalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client);
-            globalConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(globalConfigBytes), new TypeReference<Map<String, Object>>(){});
-        } catch (KeeperException.NoNodeException e) {
-            return null;
-        } catch (Exception e) {
-          throw new RestException(e);
-        }
-        return globalConfig;
+      Map<String, Object> globalConfig;
+      try {
+        EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+        globalConfig = configs.getGlobalConfig(false);
+      } catch (Exception e) {
+        throw new RestException(e.getMessage(), e);
+      }
+      return globalConfig;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
index d4438a4..293b113 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
@@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.aggregator.Aggregators;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorEnrichmentConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -43,10 +46,13 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
 
     private CuratorFramework client;
 
+    private ConfigurationsCache cache;
+
     @Autowired
-    public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) {
+    public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
       this.objectMapper = objectMapper;
       this.client = client;
+      this.cache = cache;
     }
 
     @Override
@@ -61,38 +67,27 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
 
     @Override
     public SensorEnrichmentConfig findOne(String name) throws RestException {
-        SensorEnrichmentConfig sensorEnrichmentConfig;
-        try {
-            sensorEnrichmentConfig = ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper(name, client);
-        } catch (KeeperException.NoNodeException e) {
-          return null;
-        } catch (Exception e) {
-          throw new RestException(e);
-        }
-      return sensorEnrichmentConfig;
+      EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+      return configs.getSensorEnrichmentConfig(name);
     }
 
     @Override
     public Map<String, SensorEnrichmentConfig> getAll() throws RestException {
-        Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>();
-        List<String> sensorNames = getAllTypes();
-        for (String name : sensorNames) {
-            sensorEnrichmentConfigs.put(name, findOne(name));
+      Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>();
+      List<String> sensorNames = getAllTypes();
+      for (String name : sensorNames) {
+        SensorEnrichmentConfig config = findOne(name);
+        if(config != null) {
+          sensorEnrichmentConfigs.put(name, config);
         }
-        return sensorEnrichmentConfigs;
+      }
+      return sensorEnrichmentConfigs;
     }
 
     @Override
     public List<String> getAllTypes() throws RestException {
-        List<String> types;
-        try {
-            types = client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot());
-        } catch (KeeperException.NoNodeException e) {
-            types = new ArrayList<>();
-        } catch (Exception e) {
-          throw new RestException(e);
-        }
-      return types;
+      EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class);
+      return configs.getTypes();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
index 9f984e0..5c73b26 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java
@@ -17,20 +17,19 @@
  */
 package org.apache.metron.rest.service.impl;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorIndexingConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,10 +41,13 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
 
   private CuratorFramework client;
 
+  private ConfigurationsCache cache;
+
   @Autowired
-  public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) {
+  public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) {
     this.objectMapper = objectMapper;
     this.client = client;
+    this.cache = cache;
   }
 
   @Override
@@ -60,16 +62,8 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
 
   @Override
   public Map<String, Object> findOne(String name) throws RestException {
-    Map<String, Object> sensorIndexingConfig;
-    try {
-      byte[] sensorIndexingConfigBytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(name, client);
-      sensorIndexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(sensorIndexingConfigBytes), new TypeReference<Map<String, Object>>(){});
-    } catch (KeeperException.NoNodeException e) {
-      return null;
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return sensorIndexingConfig;
+    IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
+    return configs.getSensorIndexingConfig(name, false);
   }
 
   @Override
@@ -77,22 +71,18 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ
     Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>();
     List<String> sensorNames = getAllTypes();
     for (String name : sensorNames) {
-      sensorIndexingConfigs.put(name, findOne(name));
+      Map<String, Object> config = findOne(name);
+      if(config != null) {
+        sensorIndexingConfigs.put(name, config);
+      }
     }
     return sensorIndexingConfigs;
   }
 
   @Override
   public List<String> getAllTypes() throws RestException {
-    List<String> types;
-    try {
-        types = client.getChildren().forPath(ConfigurationType.INDEXING.getZookeeperRoot());
-    } catch (KeeperException.NoNodeException e) {
-        types = new ArrayList<>();
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return types;
+    IndexingConfigurations configs = cache.get( IndexingConfigurations.class);
+    return configs.getTypes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
index f99b41c..7e70344 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java
@@ -29,13 +29,16 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.parsers.interfaces.MessageParser;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.zookeeper.KeeperException;
 import org.json.simple.JSONObject;
 import org.reflections.Reflections;
@@ -49,17 +52,21 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
 
   private CuratorFramework client;
 
+  private ConfigurationsCache cache;
+
   private GrokService grokService;
 
+  private Map<String, String> availableParsers;
+
   @Autowired
   public SensorParserConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client,
-      GrokService grokService) {
+      GrokService grokService, ConfigurationsCache cache) {
     this.objectMapper = objectMapper;
     this.client = client;
     this.grokService = grokService;
+    this.cache = cache;
   }
 
-  private Map<String, String> availableParsers;
 
   @Override
   public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException {
@@ -74,15 +81,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
 
   @Override
   public SensorParserConfig findOne(String name) throws RestException {
-    SensorParserConfig sensorParserConfig;
-    try {
-      sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(name, client);
-    } catch (KeeperException.NoNodeException e) {
-      return null;
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return sensorParserConfig;
+    ParserConfigurations configs = cache.get( ParserConfigurations.class);
+    return configs.getSensorParserConfig(name);
   }
 
   @Override
@@ -90,7 +90,10 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
     List<SensorParserConfig> sensorParserConfigs = new ArrayList<>();
     List<String> sensorNames = getAllTypes();
     for (String name : sensorNames) {
-      sensorParserConfigs.add(findOne(name));
+      SensorParserConfig config = findOne(name);
+      if(config != null) {
+        sensorParserConfigs.add(config);
+      }
     }
     return sensorParserConfigs;
   }
@@ -109,15 +112,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService
 
   @Override
   public List<String> getAllTypes() throws RestException {
-    List<String> types;
-    try {
-      types = client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot());
-    } catch (KeeperException.NoNodeException e) {
-      types = new ArrayList<>();
-    } catch (Exception e) {
-      throw new RestException(e);
-    }
-    return types;
+    ParserConfigurations configs = cache.get( ParserConfigurations.class);
+    return configs.getTypes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index ea64fbe..1150189 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -36,6 +36,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
@@ -75,6 +77,15 @@ public class TestConfig {
     return new KafkaComponent().withTopologyProperties(zkProperties);
   }
 
+  @Bean(initMethod = "start", destroyMethod="close")
+  public ConfigurationsCache cache(CuratorFramework client) {
+    return new ZKConfigurationsCache( client
+                                    , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT
+                                    , ZKConfigurationsCache.ConfiguredTypes.PARSER
+                                    , ZKConfigurationsCache.ConfiguredTypes.INDEXING
+                                    );
+  }
+
   @Bean(destroyMethod = "stop")
   public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) {
     ComponentRunner runner = new ComponentRunner.Builder()

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
index f4e18ea..abb75b1 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -97,9 +98,10 @@ public class GlobalConfigControllerIntegrationTest {
                 .andExpect(status().isCreated())
                 .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
 
-        this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson))
+        assertEventually(() -> this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson))
                 .andExpect(status().isOk())
-                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")));
+                .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+        );
 
         this.mockMvc.perform(get(globalConfigUrl).with(httpBasic(user,password)))
                 .andExpect(status().isOk());

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
index dd4eff7..15a2370 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -167,7 +168,7 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10))
             .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX"));
 
-    this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
+    assertEventually(() -> this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
             .andExpect(jsonPath("$.enrichment.fieldMap.geo[0]").value("ip_dst_addr"))
@@ -183,7 +184,8 @@ public class SensorEnrichmentConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.threatIntel.fieldToTypeMap.ip_dst_addr[0]").value("malicious_ip"))
             .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].rule").value("ip_src_addr == '10.122.196.204' or ip_dst_addr == '10.122.196.204'"))
             .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10))
-            .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX"));
+            .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX") )
+    );
 
     this.mockMvc.perform(get(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
index cebcde6..674c55a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java
@@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 import org.springframework.web.context.WebApplicationContext;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
@@ -103,11 +104,12 @@ public class SensorIndexingConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.index").value("broTest"))
             .andExpect(jsonPath("$.batchSize").value(1));
 
-    this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
+    assertEventually(() -> this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
             .andExpect(jsonPath("$.index").value("broTest"))
-            .andExpect(jsonPath("$.batchSize").value(1));
+            .andExpect(jsonPath("$.batchSize").value(1))
+    );
 
     this.mockMvc.perform(get(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
index 6e2d788..d8aea72 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java
@@ -38,7 +38,9 @@ import org.springframework.web.context.WebApplicationContext;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
 import static org.hamcrest.Matchers.hasSize;
 import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
@@ -198,16 +200,16 @@ public class SensorParserConfigControllerIntegrationTest {
     this.sensorParserConfigService.delete("broTest");
     this.sensorParserConfigService.delete("squidTest");
     Method[] method = SensorParserConfig.class.getMethods();
-    int numFields = 0;
+    final AtomicInteger numFields = new AtomicInteger(0);
     for(Method m : method) {
       if(m.getName().startsWith("set")) {
-        numFields++;
+        numFields.set(numFields.get() + 1);
       }
     }
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(squidJson))
             .andExpect(status().isCreated())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
             .andExpect(jsonPath("$.sensorTopic").value("squidTest"))
             .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -219,10 +221,10 @@ public class SensorParserConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)"))
             .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"));
 
-    this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password)))
+    assertEventually(() -> this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser"))
             .andExpect(jsonPath("$.sensorTopic").value("squidTest"))
             .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest"))
@@ -232,7 +234,8 @@ public class SensorParserConfigControllerIntegrationTest {
             .andExpect(jsonPath("$.fieldTransformations[0].output[0]").value("full_hostname"))
             .andExpect(jsonPath("$.fieldTransformations[0].output[1]").value("domain_without_subdomains"))
             .andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)"))
-            .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"));
+            .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"))
+    );
 
     this.mockMvc.perform(get(sensorParserConfigUrl).with(httpBasic(user,password)))
             .andExpect(status().isOk())
@@ -251,7 +254,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isCreated())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
             .andExpect(jsonPath("$.sensorTopic").value("broTest"))
             .andExpect(jsonPath("$.readMetadata").value("true"))
@@ -261,7 +264,7 @@ public class SensorParserConfigControllerIntegrationTest {
     this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson))
             .andExpect(status().isOk())
             .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-            .andExpect(jsonPath("$.*", hasSize(numFields)))
+            .andExpect(jsonPath("$.*", hasSize(numFields.get())))
             .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser"))
             .andExpect(jsonPath("$.sensorTopic").value("broTest"))
             .andExpect(jsonPath("$.readMetadata").value("true"))

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
index 5c6dd12..e3518ca 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java
@@ -18,9 +18,11 @@
 package org.apache.metron.rest.controller;
 
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.rest.model.TopologyStatusCode;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.metron.rest.service.SensorParserConfigService;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -169,6 +171,11 @@ public class StormControllerIntegrationTest {
             .andExpect(jsonPath("$.message").value(TopologyStatusCode.GLOBAL_CONFIG_MISSING.name()));
 
     globalConfigService.save(globalConfig);
+    {
+      final Map<String, Object> expectedGlobalConfig = globalConfig;
+      //we must wait for the config to find its way into the config.
+      TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get()));
+    }
 
     this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())
@@ -179,6 +186,11 @@ public class StormControllerIntegrationTest {
     sensorParserConfig.setParserClassName("org.apache.metron.parsers.bro.BasicBroParser");
     sensorParserConfig.setSensorTopic("broTest");
     sensorParserConfigService.save(sensorParserConfig);
+    {
+      final Map<String, Object> expectedGlobalConfig = globalConfig;
+      //we must wait for the config to find its way into the config.
+      TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get()));
+    }
 
     this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password)))
             .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
index 824fb4b..85a66b3 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
@@ -28,11 +28,15 @@ import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.DeleteBuilder;
 import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -49,11 +53,13 @@ public class GlobalConfigServiceImplTest {
 
   CuratorFramework curatorFramework;
   GlobalConfigService globalConfigService;
+  ConfigurationsCache cache;
 
   @Before
   public void setUp() throws Exception {
     curatorFramework = mock(CuratorFramework.class);
-    globalConfigService = new GlobalConfigServiceImpl(curatorFramework);
+    cache = mock(ConfigurationsCache.class);
+    globalConfigService = new GlobalConfigServiceImpl(curatorFramework, cache);
   }
 
 
@@ -98,25 +104,19 @@ public class GlobalConfigServiceImplTest {
       put("k", "v");
     }};
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(config.getBytes());
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ConfigurationType.GLOBAL.getTypeName(), configMap);
+      }
+    };
+    when(cache.get( eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(configMap, globalConfigService.get());
   }
 
   @Test
-  public void getShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(globalConfigService.get());
-  }
-
-  @Test
   public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception {
     exception.expect(RestException.class);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
index c26a210..0a78f4a 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.rest.service.impl;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.DeleteBuilder;
@@ -25,9 +26,11 @@ import org.apache.curator.framework.api.GetChildrenBuilder;
 import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorEnrichmentConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -40,6 +43,7 @@ import org.junit.rules.ExpectedException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -79,11 +83,14 @@ public class SensorEnrichmentConfigServiceImplTest {
   @Multiline
   public static String broJson;
 
+  ConfigurationsCache cache;
+
   @Before
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
     curatorFramework = mock(CuratorFramework.class);
-    sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework);
+    cache = mock(ConfigurationsCache.class);
+    sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache);
   }
 
 
@@ -125,84 +132,54 @@ public class SensorEnrichmentConfigServiceImplTest {
   public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
     final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig();
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig);
+      }
+    };
+    when(cache.get(eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
+    //We only have bro, so we should expect it to be returned
     assertEquals(getTestSensorEnrichmentConfig(), sensorEnrichmentConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(sensorEnrichmentConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    sensorEnrichmentConfigService.findOne("bro");
+    //and blah should be a miss.
+    assertNull(sensorEnrichmentConfigService.findOne("blah"));
   }
 
   @Test
   public void getAllTypesShouldProperlyReturnTypes() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), new HashMap<>()
+                              ,EnrichmentConfigurations.getKey("squid"), new HashMap<>()
+                              );
+      }
+    };
+    when(cache.get(eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add("bro");
       add("squid");
     }}, sensorEnrichmentConfigService.getAllTypes());
-  }
 
-  @Test
-  public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    assertEquals(new ArrayList<>(), sensorEnrichmentConfigService.getAllTypes());
   }
 
-  @Test
-  public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(Exception.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    sensorEnrichmentConfigService.getAllTypes();
-  }
 
   @Test
   public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
     final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig();
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    EnrichmentConfigurations configs = new EnrichmentConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig);
+      }
+    };
+    when(cache.get( eq(EnrichmentConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new HashMap() {{ put("bro", sensorEnrichmentConfig);}}, sensorEnrichmentConfigService.getAll());
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
index 43ca0f7..9641a52 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.metron.rest.service.impl;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.DeleteBuilder;
@@ -25,6 +28,9 @@ import org.apache.curator.framework.api.GetChildrenBuilder;
 import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorIndexingConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -36,6 +42,7 @@ import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -55,6 +62,7 @@ public class SensorIndexingConfigServiceImplTest {
   ObjectMapper objectMapper;
   CuratorFramework curatorFramework;
   SensorIndexingConfigService sensorIndexingConfigService;
+  ConfigurationsCache cache;
 
   /**
    {
@@ -72,7 +80,8 @@ public class SensorIndexingConfigServiceImplTest {
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
     curatorFramework = mock(CuratorFramework.class);
-    sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework);
+    cache = mock(ConfigurationsCache.class);
+    sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework, cache);
   }
 
 
@@ -114,44 +123,36 @@ public class SensorIndexingConfigServiceImplTest {
   public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
     final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig();
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    IndexingConfigurations configs = new IndexingConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig);
+      }
+    };
+    when(cache.get( eq(IndexingConfigurations.class)))
+            .thenReturn(configs);
 
+    //We only have bro, so we should expect it to be returned
     assertEquals(getTestSensorIndexingConfig(), sensorIndexingConfigService.findOne("bro"));
+    //and blah should be a miss.
+    assertNull(sensorIndexingConfigService.findOne("blah"));
   }
 
-  @Test
-  public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(sensorIndexingConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
 
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    sensorIndexingConfigService.findOne("bro");
-  }
 
   @Test
   public void getAllTypesShouldProperlyReturnTypes() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+    IndexingConfigurations configs = new IndexingConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(IndexingConfigurations.getKey("bro"), new HashMap<>()
+                              ,IndexingConfigurations.getKey("squid"), new HashMap<>()
+                              );
+      }
+    };
+    when(cache.get(eq(IndexingConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add("bro");
@@ -159,39 +160,18 @@ public class SensorIndexingConfigServiceImplTest {
     }}, sensorIndexingConfigService.getAllTypes());
   }
 
-  @Test
-  public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    assertEquals(new ArrayList<>(), sensorIndexingConfigService.getAllTypes());
-  }
 
   @Test
-  public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(Exception.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    sensorIndexingConfigService.getAllTypes();
-  }
-
-  @Test
-  public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception {
+  public void getAllShouldProperlyReturnIndexingConfigs() throws Exception {
     final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig();
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    IndexingConfigurations configs = new IndexingConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig );
+      }
+    };
+    when(cache.get(eq(IndexingConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new HashMap() {{ put("bro", sensorIndexingConfig);}}, sensorIndexingConfigService.getAll());
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
index c96a796..7998c21 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.rest.service.impl;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import oi.thekraken.grok.api.Grok;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.curator.framework.CuratorFramework;
@@ -27,7 +28,9 @@ import org.apache.curator.framework.api.GetDataBuilder;
 import org.apache.curator.framework.api.SetDataBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.ParseMessageRequest;
 import org.apache.metron.rest.service.GrokService;
@@ -95,6 +98,8 @@ public class SensorParserConfigServiceImplTest {
 
   private String user = "user1";
 
+  ConfigurationsCache cache;
+
   @Before
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
@@ -105,7 +110,8 @@ public class SensorParserConfigServiceImplTest {
     SecurityContextHolder.getContext().setAuthentication(authentication);
     when(environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY)).thenReturn("./target");
     grokService = new GrokServiceImpl(environment, mock(Grok.class), new HdfsServiceImpl(new Configuration()));
-    sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService);
+    cache = mock(ConfigurationsCache.class);
+    sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService, cache);
   }
 
 
@@ -144,47 +150,36 @@ public class SensorParserConfigServiceImplTest {
   }
 
   @Test
-  public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception {
+  public void findOneShouldProperlyReturnSensorParserConfig() throws Exception {
     final SensorParserConfig sensorParserConfig = getTestBroSensorParserConfig();
 
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    ParserConfigurations configs = new ParserConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ParserConfigurations.getKey("bro"), sensorParserConfig);
+      }
+    };
+    when(cache.get(eq(ParserConfigurations.class)))
+            .thenReturn(configs);
 
+    //We only have bro, so we should expect it to be returned
     assertEquals(getTestBroSensorParserConfig(), sensorParserConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    assertNull(sensorParserConfigService.findOne("bro"));
-  }
-
-  @Test
-  public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(Exception.class);
-
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
-
-    sensorParserConfigService.findOne("bro");
+    //and blah should be a miss.
+    assertNull(sensorParserConfigService.findOne("blah"));
   }
 
   @Test
   public void getAllTypesShouldProperlyReturnTypes() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+    ParserConfigurations configs = new ParserConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ParserConfigurations.getKey("bro"), new HashMap<>()
+                              ,ParserConfigurations.getKey("squid"), new HashMap<>()
+                              );
+      }
+    };
+    when(cache.get( eq(ParserConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add("bro");
@@ -193,41 +188,19 @@ public class SensorParserConfigServiceImplTest {
   }
 
   @Test
-  public void getAllTypesShouldReturnEmptyListWhenNoNodeExceptionIsThrown() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    assertEquals(new ArrayList<>(), sensorParserConfigService.getAllTypes());
-  }
-
-  @Test
-  public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception {
-    exception.expect(RestException.class);
-
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(Exception.class);
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
-    sensorParserConfigService.getAllTypes();
-  }
-
-  @Test
   public void getAllShouldProperlyReturnSensorParserConfigs() throws Exception {
-    GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
-    when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot()))
-            .thenReturn(new ArrayList() {{
-              add("bro");
-              add("squid");
-            }});
-    when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
-
     final SensorParserConfig broSensorParserConfig = getTestBroSensorParserConfig();
     final SensorParserConfig squidSensorParserConfig = getTestSquidSensorParserConfig();
-    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes());
-    when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/squid")).thenReturn(squidJson.getBytes());
-    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+    ParserConfigurations configs = new ParserConfigurations(){
+      @Override
+      public Map<String, Object> getConfigurations() {
+        return ImmutableMap.of(ParserConfigurations.getKey("bro"), broSensorParserConfig
+                              ,ParserConfigurations.getKey("squid"), squidSensorParserConfig
+                              );
+      }
+    };
+    when(cache.get( eq(ParserConfigurations.class)))
+            .thenReturn(configs);
 
     assertEquals(new ArrayList() {{
       add(getTestBroSensorParserConfig());

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 3054881..8734d63 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -54,6 +54,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-zookeeper</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-integration-test</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
@@ -289,11 +294,6 @@
             <version>${global_jackson_version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${global_curator_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>flux-core</artifactId>
             <version>${global_flux_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
index a97091a..6f15746 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java
@@ -17,54 +17,58 @@
  */
 package org.apache.metron.common.bolt;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.cache.TreeCache;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.zookeeper.SimpleEventListener;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.Reloadable;
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt {
+public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt implements Reloadable {
 
   private static final Logger LOG =  LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private String zookeeperUrl;
 
   protected CuratorFramework client;
-  protected TreeCache cache;
-  private final CONFIG_T configurations = defaultConfigurations();
+  protected ZKCache cache;
+  private final CONFIG_T configurations;
   public ConfiguredBolt(String zookeeperUrl) {
     this.zookeeperUrl = zookeeperUrl;
+    this.configurations = createUpdater().defaultConfigurations();
   }
 
   public void setCuratorFramework(CuratorFramework client) {
     this.client = client;
   }
 
-  public void setTreeCache(TreeCache cache) {
+  public void setZKCache(ZKCache cache) {
     this.cache = cache;
   }
 
+  @Override
   public void reloadCallback(String name, ConfigurationType type) {
   }
+
   public CONFIG_T getConfigurations() {
     return configurations;
   }
-  protected abstract CONFIG_T defaultConfigurations();
 
+  protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater();
 
 
   @Override
@@ -85,30 +89,30 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba
       //zookeeper.
       ConfigurationsUtils.setupStellarStatically(client);
       if (cache == null) {
-        cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
-        TreeCacheListener listener = new TreeCacheListener() {
-          @Override
-          public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
-            if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-              String path = event.getData().getPath();
-              byte[] data = event.getData().getData();
-              updateConfig(path, data);
-            }
-          }
-        };
-        cache.getListenable().addListener(listener);
-        loadConfig();
+        ConfigurationsUpdater<CONFIG_T> updater = createUpdater();
+        SimpleEventListener listener = new SimpleEventListener.Builder()
+                                                              .with( updater::update
+                                                                   , TreeCacheEvent.Type.NODE_ADDED
+                                                                   , TreeCacheEvent.Type.NODE_UPDATED
+                                                                   )
+                                                              .with( updater::delete
+                                                                   , TreeCacheEvent.Type.NODE_REMOVED
+                                                                   )
+                                                              .build();
+        cache = new ZKCache.Builder()
+                           .withClient(client)
+                           .withListener(listener)
+                           .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT)
+                           .build();
+        updater.forceUpdate(client);
+        cache.start();
       }
-      cache.start();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       throw new RuntimeException(e);
     }
   }
 
-  abstract public void loadConfig();
-  abstract public void updateConfig(String path, byte[] data) throws IOException;
-
   @Override
   public void cleanup() {
     cache.close();

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
index 9c3ee97..54fd7e8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,31 +37,7 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<Enrichment
   }
 
   @Override
-  protected EnrichmentConfigurations defaultConfigurations() {
-    return new EnrichmentConfigurations();
-  }
-
-  @Override
-  public void loadConfig() {
-    try {
-
-      ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client);
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
-
-  @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-        getConfigurations().updateSensorEnrichmentConfig(name, data);
-        reloadCallback(name, ConfigurationType.ENRICHMENT);
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
+  protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() {
+    return new EnrichmentUpdater(this, this::getConfigurations);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
index cddcada..09300e4 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
 import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,30 +35,8 @@ public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConf
   }
 
   @Override
-  protected IndexingConfigurations defaultConfigurations() {
-    return new IndexingConfigurations();
+  protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() {
+    return new IndexingUpdater(this, this::getConfigurations);
   }
 
-  @Override
-  public void loadConfig() {
-    try {
-      ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client);
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
-
-  @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
-        getConfigurations().updateSensorIndexingConfig(name, data);
-        reloadCallback(name, ConfigurationType.INDEXING);
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 99313fa..2f13658 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -23,6 +23,8 @@ import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,34 +43,14 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur
     return getConfigurations().getSensorParserConfig(sensorType);
   }
 
-  @Override
-  protected ParserConfigurations defaultConfigurations() {
-    return new ParserConfigurations();
-  }
-
   public String getSensorType() {
     return sensorType;
   }
-  @Override
-  public void loadConfig() {
-    try {
-      ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client);
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
+
 
   @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-      if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-        getConfigurations().updateSensorParserConfig(name, data);
-        reloadCallback(name, ConfigurationType.PARSER);
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
+  protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+    return new ParserUpdater(this, this::getConfigurations);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
index 22ff3a9..90575d0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java
@@ -17,16 +17,12 @@
  */
 package org.apache.metron.common.bolt;
 
-import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.metron.common.configuration.ConfigurationType;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
+import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,43 +42,8 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf
   }
 
   @Override
-  protected ProfilerConfigurations defaultConfigurations() {
-    return new ProfilerConfigurations();
+  protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
+    return new ProfilerUpdater(this, this::getConfigurations);
   }
 
-  @Override
-  public void loadConfig() {
-    try {
-      ProfilerConfig config = readFromZookeeper(client);
-      if(config != null) {
-        getConfigurations().updateProfilerConfig(config);
-      }
-
-    } catch (Exception e) {
-      LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...");
-    }
-  }
-
-  private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception {
-    byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot());
-    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class);
-  }
-
-  @Override
-  public void updateConfig(String path, byte[] data) throws IOException {
-    if (data.length != 0) {
-      String name = path.substring(path.lastIndexOf("/") + 1);
-
-      // update the profiler configuration from zookeeper
-      if (path.startsWith(ConfigurationType.PROFILER.getZookeeperRoot())) {
-        getConfigurations().updateProfilerConfig(data);
-        reloadCallback(name, ConfigurationType.PROFILER);
-
-      // update the global configuration from zookeeper
-      } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-        getConfigurations().updateGlobalConfig(data);
-        reloadCallback(name, ConfigurationType.GLOBAL);
-      }
-    }
-  }
 }


Mime
View raw message