ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mgerg...@apache.org
Subject [2/2] ambari git commit: AMBARI-21507 Log Search Solr output properties should be provided by the Config API (mgergely)
Date Thu, 27 Jul 2017 14:53:45 GMT
AMBARI-21507 Log Search Solr output properties should be provided by the Config API (mgergely)

Change-Id: I32ec1afa8549b7e065fa904f2de2db0b255f690f


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dc85e67d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dc85e67d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dc85e67d

Branch: refs/heads/trunk
Commit: dc85e67d7d1f1287398824541c99b7f3872796a0
Parents: e3a50d9
Author: Miklos Gergely <mgergely@hortonworks.com>
Authored: Thu Jul 27 16:53:34 2017 +0200
Committer: Miklos Gergely <mgergely@hortonworks.com>
Committed: Thu Jul 27 16:53:34 2017 +0200

----------------------------------------------------------------------
 .../logsearch/config/api/LogSearchConfig.java   |  41 +++-
 .../config/api/OutputConfigMonitor.java         |  44 +++++
 .../model/outputconfig/OutputProperties.java    |  23 +++
 .../outputconfig/OutputSolrProperties.java      |  26 +++
 .../config/api/LogSearchConfigClass1.java       |  19 +-
 .../config/api/LogSearchConfigClass2.java       |  19 +-
 .../config/zookeeper/LogSearchConfigZK.java     |  87 +++++++--
 .../impl/OutputSolrPropertiesImpl.java          |  46 +++++
 .../org/apache/ambari/logfeeder/LogFeeder.java  |   5 +-
 .../ambari/logfeeder/common/ConfigHandler.java  |  11 +-
 .../logfeeder/common/LogEntryParseTester.java   |   2 +-
 .../logfeeder/input/InputConfigUploader.java    |   2 +-
 .../ambari/logfeeder/input/InputSimulate.java   |   1 +
 .../apache/ambari/logfeeder/output/Output.java  |  36 +++-
 .../ambari/logfeeder/output/OutputManager.java  |  11 ++
 .../ambari/logfeeder/output/OutputSolr.java     | 187 +++++++++++--------
 .../ambari/logfeeder/output/OutputSolrTest.java |  29 ++-
 .../logsearch/conf/SolrAuditLogPropsConfig.java |   5 +
 .../conf/SolrEventHistoryPropsConfig.java       |   5 +
 .../ambari/logsearch/conf/SolrPropsConfig.java  |   2 +
 .../conf/SolrServiceLogPropsConfig.java         |   5 +
 .../configurer/LogSearchConfigConfigurer.java   |   3 +
 .../configurer/SolrCollectionConfigurer.java    |   5 +-
 .../ambari/logsearch/dao/AuditSolrDao.java      |   1 +
 .../logsearch/dao/ServiceLogsSolrDao.java       |   1 +
 .../ambari/logsearch/dao/SolrDaoBase.java       |  13 +-
 .../handler/CreateCollectionHandler.java        |  12 +-
 .../logsearch/manager/ShipperConfigManager.java |  10 +-
 .../logfeeder/shipper-conf/output.config.json   |  10 +-
 .../server/upgrade/UpgradeCatalog300.java       |  42 +++--
 .../0.5.0/properties/output.config.json.j2      |   8 +-
 .../LOGSEARCH/0.5.0/service_advisor.py          |  33 ++--
 .../server/upgrade/UpgradeCatalog300Test.java   |  73 ++++++--
 33 files changed, 625 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
index 6c5cefd..76be392 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/LogSearchConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 /**
@@ -57,14 +58,23 @@ public interface LogSearchConfig extends Closeable {
   List<String> getServices(String clusterName);
 
   /**
-   * Checks if input configuration exists.
+   * Checks if input configuration exists. Will be used only in LOGFEEDER mode.
+   * 
+   * @param serviceName The name of the service looked for.
+   * @return If input configuration exists for the service.
+   * @throws Exception
+   */
+  boolean inputConfigExistsLogFeeder(String serviceName) throws Exception;
+
+  /**
+   * Checks if input configuration exists. Will be used only in SERVER mode.
    * 
    * @param clusterName The name of the cluster where the service is looked for.
    * @param serviceName The name of the service looked for.
    * @return If input configuration exists for the service.
    * @throws Exception
    */
-  boolean inputConfigExists(String clusterName, String serviceName) throws Exception;
+  boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception;
 
   /**
    * Returns the global configurations of a cluster. Will be used only in SERVER mode.
@@ -140,4 +150,31 @@ public interface LogSearchConfig extends Closeable {
    */
   void monitorInputConfigChanges(InputConfigMonitor inputConfigMonitor, LogLevelFilterMonitor logLevelFilterMonitor,
       String clusterName) throws Exception;
+
+  /**
+   * Saves the properties of an Output Solr. Will be used only in SERVER mode.
+   * 
+   * @param type The type of the Output Solr.
+   * @param outputSolrProperties The properties of the Output Solr.
+   * @throws Exception
+   */
+  void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception;
+
+  /**
+   * Get the properties of an Output Solr. Will be used only in LOGFEEDER mode.
+   * 
+   * @param type The type of the Output Solr.
+   * @return The properties of the Output Solr, or null if it doesn't exist.
+   * @throws Exception
+   */
+  OutputSolrProperties getOutputSolrProperties(String type) throws Exception;
+
+  /**
+   * Saves the properties of an Output Solr. Will be used only in LOGFEEDER mode.
+   * 
+   * @param type The type of the Output Solr.
+   * @param outputConfigMonitors The monitors which want to watch the output config changes.
+   * @throws Exception
+   */
+  void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java
new file mode 100644
index 0000000..c54626d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/OutputConfigMonitor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api;
+
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+
+/**
+ * Monitors output configuration changes.
+ */
+public interface OutputConfigMonitor {
+  /** 
+   * @return The destination of the output.
+   */
+  String getDestination();
+
+  /**
+   * @return The type of the output logs.
+   */
+  String getOutputType();
+
+  /**
+   * Will be called whenever there is a change in the configuration of the output.
+   * 
+   * @param outputProperties The modified properties of the output.
+   */
+  void outputConfigChanged(OutputProperties outputProperties);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java
new file mode 100644
index 0000000..affd5b9
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputProperties.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.outputconfig;
+
+public interface OutputProperties {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java
new file mode 100644
index 0000000..586e785
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/outputconfig/OutputSolrProperties.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.api.model.outputconfig;
+
+public interface OutputSolrProperties extends OutputProperties {
+  String getCollection();
+
+  String getSplitIntervalMins();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
index 28844d5..e308346 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass1.java
@@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 public class LogSearchConfigClass1 implements LogSearchConfig {
@@ -33,7 +34,12 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
   public void init(Component component, Map<String, String> properties, String clusterName) {}
 
   @Override
-  public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+  public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception {
+    return false;
+  }
+
+  @Override
+  public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception {
     return false;
   }
 
@@ -74,5 +80,16 @@ public class LogSearchConfigClass1 implements LogSearchConfig {
   }
 
   @Override
+  public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {}
+
+  @Override
+  public OutputSolrProperties getOutputSolrProperties(String type) {
+    return null;
+  }
+
+  @Override
+  public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {}
+
+  @Override
   public void close() {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
index 5934fa6..b64dae8 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/test/java/org/apache/ambari/logsearch/config/api/LogSearchConfigClass2.java
@@ -26,6 +26,7 @@ import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 
 public class LogSearchConfigClass2 implements LogSearchConfig {
@@ -33,7 +34,12 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
   public void init(Component component, Map<String, String> properties, String clusterName) {}
 
   @Override
-  public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+  public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception {
+    return false;
+  }
+
+  @Override
+  public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception {
     return false;
   }
 
@@ -74,5 +80,16 @@ public class LogSearchConfigClass2 implements LogSearchConfig {
   }
 
   @Override
+  public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {}
+
+  @Override
+  public OutputSolrProperties getOutputSolrProperties(String type) {
+    return null;
+  }
+
+  @Override
+  public void monitorOutputProperties(List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {}
+
+  @Override
   public void close() {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
index fdd8ed6..387d0c6 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogSearchConfigZK.java
@@ -27,12 +27,15 @@ import java.util.TreeMap;
 
 import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilter;
 import org.apache.ambari.logsearch.config.api.model.loglevelfilter.LogLevelFilterMap;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputAdapter;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
 import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
 import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
 import org.apache.commons.collections.MapUtils;
@@ -98,9 +101,12 @@ public class LogSearchConfigZK implements LogSearchConfig {
 
   private Map<String, String> properties;
   private CuratorFramework client;
-  private TreeCache cache;
   private Gson gson;
 
+  private TreeCache serverCache;
+  private TreeCache logFeederClusterCache;
+  private TreeCache outputCache;
+
   @Override
   public void init(Component component, Map<String, String> properties, String clusterName) throws Exception {
     this.properties = properties;
@@ -115,28 +121,39 @@ public class LogSearchConfigZK implements LogSearchConfig {
         .build();
     client.start();
 
+    outputCache = new TreeCache(client, "/output");
+    outputCache.start();
 
     if (component == Component.SERVER) {
       if (client.checkExists().forPath("/") == null) {
         client.create().creatingParentContainersIfNeeded().forPath("/");
       }
-      cache = new TreeCache(client, "/");
-      cache.start();
+      if (client.checkExists().forPath("/output") == null) {
+        client.create().creatingParentContainersIfNeeded().forPath("/output");
+      }
+      serverCache = new TreeCache(client, "/");
+      serverCache.start();
     } else {
       while (client.checkExists().forPath("/") == null) {
         LOG.info("Root node is not present yet, going to sleep for " + WAIT_FOR_ROOT_SLEEP_SECONDS + " seconds");
         Thread.sleep(WAIT_FOR_ROOT_SLEEP_SECONDS * 1000);
       }
-      cache = new TreeCache(client, String.format("/%s", clusterName));
+      logFeederClusterCache = new TreeCache(client, String.format("/%s", clusterName));
     }
     
     gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
   }
 
   @Override
-  public boolean inputConfigExists(String clusterName, String serviceName) throws Exception {
+  public boolean inputConfigExistsLogFeeder(String serviceName) throws Exception {
+    String nodePath = String.format("/input/%s", serviceName);
+    return logFeederClusterCache.getCurrentData(nodePath) != null;
+  }
+
+  @Override
+  public boolean inputConfigExistsServer(String clusterName, String serviceName) throws Exception {
     String nodePath = String.format("/%s/input/%s", clusterName, serviceName);
-    return cache.getCurrentData(nodePath) != null;
+    return serverCache.getCurrentData(nodePath) != null;
   }
 
   @Override
@@ -261,8 +278,8 @@ public class LogSearchConfigZK implements LogSearchConfig {
         }
       }
     };
-    cache.getListenable().addListener(listener);
-    cache.start();
+    logFeederClusterCache.getListenable().addListener(listener);
+    logFeederClusterCache.start();
   }
 
   private void createGlobalConfigNode(JsonArray globalConfigNode, String clusterName) {
@@ -270,7 +287,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
     String data = InputConfigGson.gson.toJson(globalConfigNode);
     
     try {
-      if (cache.getCurrentData(globalConfigNodePath) != null) {
+      if (logFeederClusterCache.getCurrentData(globalConfigNodePath) != null) {
         client.setData().forPath(globalConfigNodePath, data.getBytes());
       } else {
         client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(globalConfigNodePath, data.getBytes());
@@ -283,14 +300,14 @@ public class LogSearchConfigZK implements LogSearchConfig {
   @Override
   public List<String> getServices(String clusterName) {
     String parentPath = String.format("/%s/input", clusterName);
-    Map<String, ChildData> serviceNodes = cache.getCurrentChildren(parentPath);
+    Map<String, ChildData> serviceNodes = serverCache.getCurrentChildren(parentPath);
     return new ArrayList<String>(serviceNodes.keySet());
   }
 
   @Override
   public String getGlobalConfigs(String clusterName) {
     String globalConfigNodePath = String.format("/%s/global", clusterName);
-    return new String(cache.getCurrentData(globalConfigNodePath).getData());
+    return new String(serverCache.getCurrentData(globalConfigNodePath).getData());
   }
 
   @Override
@@ -299,7 +316,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
     JsonArray globalConfigs = (JsonArray) new JsonParser().parse(globalConfigData);
     InputAdapter.setGlobalConfigs(globalConfigs);
     
-    ChildData childData = cache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName));
+    ChildData childData = serverCache.getCurrentData(String.format("/%s/input/%s", clusterName, serviceName));
     return childData == null ? null : InputConfigGson.gson.fromJson(new String(childData.getData()), InputConfigImpl.class);
   }
 
@@ -320,7 +337,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
     for (Map.Entry<String, LogLevelFilter> e : filters.getFilter().entrySet()) {
       String nodePath = String.format("/%s/loglevelfilter/%s", clusterName, e.getKey());
       String logLevelFilterJson = gson.toJson(e.getValue());
-      String currentLogLevelFilterJson = new String(cache.getCurrentData(nodePath).getData());
+      String currentLogLevelFilterJson = new String(serverCache.getCurrentData(nodePath).getData());
       if (!logLevelFilterJson.equals(currentLogLevelFilterJson)) {
         client.setData().forPath(nodePath, logLevelFilterJson.getBytes());
         LOG.info("Set log level filter for the log " + e.getKey() + " for cluster " + clusterName);
@@ -331,7 +348,7 @@ public class LogSearchConfigZK implements LogSearchConfig {
   @Override
   public LogLevelFilterMap getLogLevelFilters(String clusterName) {
     String parentPath = String.format("/%s/loglevelfilter", clusterName);
-    Map<String, ChildData> logLevelFilterNodes = cache.getCurrentChildren(parentPath);
+    Map<String, ChildData> logLevelFilterNodes = serverCache.getCurrentChildren(parentPath);
     TreeMap<String, LogLevelFilter> filters = new TreeMap<>();
     for (Map.Entry<String, ChildData> e : logLevelFilterNodes.entrySet()) {
       LogLevelFilter logLevelFilter = gson.fromJson(new String(e.getValue().getData()), LogLevelFilter.class);
@@ -387,6 +404,48 @@ public class LogSearchConfigZK implements LogSearchConfig {
   }
 
   @Override
+  public void saveOutputSolrProperties(String type, OutputSolrProperties outputSolrProperties) throws Exception {
+    String nodePath = String.format("/output/solr/%s", type);
+    String data = gson.toJson(outputSolrProperties);
+    if (outputCache.getCurrentData(nodePath) == null) {
+      client.create().creatingParentContainersIfNeeded().withACL(getAcls()).forPath(nodePath, data.getBytes());
+    } else {
+      client.setData().forPath(nodePath, data.getBytes());
+    }
+  }
+
+  @Override
+  public OutputSolrProperties getOutputSolrProperties(String type) throws Exception {
+    String nodePath = String.format("/output/solr/%s", type);
+    ChildData currentData = outputCache.getCurrentData(nodePath);
+    return currentData == null ?
+        null :
+        gson.fromJson(new String(currentData.getData()), OutputSolrPropertiesImpl.class);
+  }
+
+  @Override
+  public void monitorOutputProperties(final List<? extends OutputConfigMonitor> outputConfigMonitors) throws Exception {
+    TreeCacheListener listener = new TreeCacheListener() {
+      public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
+        if (event.getType() != Type.NODE_UPDATED) {
+          return;
+        }
+        
+        LOG.info("Output config updated: " + event.getData().getPath());
+        for (OutputConfigMonitor monitor : outputConfigMonitors) {
+          String monitorPath = String.format("/output/%s/%s", monitor.getDestination(), monitor.getOutputType());
+          if (monitorPath.equals(event.getData().getPath())) {
+            String nodeData = new String(event.getData().getData());
+            OutputSolrProperties outputSolrProperties = gson.fromJson(nodeData, OutputSolrPropertiesImpl.class);
+            monitor.outputConfigChanged(outputSolrProperties);
+          }
+        }
+      }
+    };
+    outputCache.getListenable().addListener(listener);
+  }
+
+  @Override
   public void close() {
     LOG.info("Closing ZooKeeper Connection");
     client.close();

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java
new file mode 100644
index 0000000..4b9f54c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/outputconfig/impl/OutputSolrPropertiesImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl;
+
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+
+import com.google.gson.annotations.SerializedName;
+
+public class OutputSolrPropertiesImpl implements OutputSolrProperties {
+  private final String collection;
+
+  @SerializedName("split_interval_mins")
+  private final String splitIntervalMins;
+
+  public OutputSolrPropertiesImpl(String collection, String splitIntervalMins) {
+    this.collection = collection;
+    this.splitIntervalMins = splitIntervalMins;
+  }
+
+  @Override
+  public String getCollection() {
+    return collection;
+  }
+
+  @Override
+  public String getSplitIntervalMins() {
+    return splitIntervalMins;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index 59c2a22..ba3412b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -53,7 +53,7 @@ public class LogFeeder {
 
   private final LogFeederCommandLine cli;
   
-  private ConfigHandler configHandler = new ConfigHandler();
+  private ConfigHandler configHandler;
   private LogSearchConfig config;
   
   private MetricsManager metricsManager = new MetricsManager();
@@ -78,11 +78,12 @@ public class LogFeeder {
   private void init() throws Throwable {
     long startTime = System.currentTimeMillis();
 
-    configHandler.init();
     SSLUtil.ensureStorePasswords();
     
     config = LogSearchConfigFactory.createLogSearchConfig(Component.LOGFEEDER, Maps.fromProperties(LogFeederUtil.getProperties()),
         LogFeederUtil.getClusterName(), LogSearchConfigZK.class);
+    configHandler = new ConfigHandler(config);
+    configHandler.init();
     LogLevelFilterHandler.init(config);
     InputConfigUploader.load(config);
     config.monitorInputConfigChanges(configHandler, new LogLevelFilterHandler(), LogFeederUtil.getClusterName());

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 5bf074c..30b61a1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -48,6 +48,7 @@ import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
 import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
@@ -85,6 +86,8 @@ public class ConfigHandler implements InputConfigMonitor {
   )
   private static final String SIMULATE_INPUT_NUMBER_PROPERTY = "logfeeder.simulate.input_number";
 
+  private final LogSearchConfig logSearchConfig;
+  
   private final OutputManager outputManager = new OutputManager();
   private final InputManager inputManager = new InputManager();
 
@@ -97,7 +100,9 @@ public class ConfigHandler implements InputConfigMonitor {
   
   private boolean simulateMode = false;
   
-  public ConfigHandler() {}
+  public ConfigHandler(LogSearchConfig logSearchConfig) {
+    this.logSearchConfig = logSearchConfig;
+  }
   
   public void init() throws Exception {
     loadConfigFiles();
@@ -106,6 +111,8 @@ public class ConfigHandler implements InputConfigMonitor {
     
     inputManager.init();
     outputManager.init();
+    
+    logSearchConfig.monitorOutputProperties(outputManager.getOutputsToMonitor());
   }
   
   private void loadConfigFiles() throws Exception {
@@ -271,6 +278,7 @@ public class ConfigHandler implements InputConfigMonitor {
       }
       output.setDestination(value);
       output.loadConfig(map);
+      output.setLogSearchConfig(logSearchConfig);
 
       // We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
       if (output.isEnabled()) {
@@ -387,6 +395,7 @@ public class ConfigHandler implements InputConfigMonitor {
     
     // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
     for (Output output : InputSimulate.getSimulateOutputs()) {
+      output.setLogSearchConfig(logSearchConfig);
       outputManager.add(output);
       usedOutputSet.add(output);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index 5356159..ec29f69 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -73,7 +73,7 @@ public class LogEntryParseTester {
 
   public Map<String, Object> parse() throws Exception {
     InputConfig inputConfig = getInputConfig();
-    ConfigHandler configHandler = new ConfigHandler();
+    ConfigHandler configHandler = new ConfigHandler(null);
     Input input = configHandler.getTestInput(inputConfig, logId);
     final Map<String, Object> result = new HashMap<>();
     input.getFirstFilter().init();

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index 09fc3f5..10642d1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -84,7 +84,7 @@ public class InputConfigUploader extends Thread {
             String serviceName = m.group(1);
             String inputConfig = Files.toString(inputConfigFile, Charset.defaultCharset());
             
-            if (!config.inputConfigExists(LogFeederUtil.getClusterName(), serviceName)) {
+            if (!config.inputConfigExistsLogFeeder(serviceName)) {
               config.createInputConfig(LogFeederUtil.getClusterName(), serviceName, inputConfig);
             }
             filesHandled.add(inputConfigFile.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index f1002ae..7c487ba 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -163,6 +163,7 @@ public class InputSimulate extends Input {
       Class<? extends Output> clazz = output.getClass();
       Output outputCopy = clazz.newInstance();
       outputCopy.loadConfig(output.getConfigs());
+      outputCopy.setDestination(output.getDestination());
       simulateOutputs.add(outputCopy);
       super.addOutput(outputCopy);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index 65b9e19..b370e58 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -28,8 +28,11 @@ import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
 
-public abstract class Output extends ConfigBlock {
+public abstract class Output extends ConfigBlock implements OutputConfigMonitor {
   private String destination = null;
 
   protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
@@ -37,6 +40,20 @@ public abstract class Output extends ConfigBlock {
     return null;
   }
 
+  public boolean monitorConfigChanges() {
+    return false;
+  };
+  
+  @Override
+  public String getOutputType() {
+    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+  }
+  
+  @Override
+  public void outputConfigChanged(OutputProperties outputProperties) {
+    throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+  };
+
   @Override
   public String getShortDescription() {
     return null;
@@ -50,14 +67,11 @@ public abstract class Output extends ConfigBlock {
     return super.getNameForThread();
   }
 
-  public abstract void write(String block, InputMarker inputMarker)
-      throws Exception;
+  public abstract void write(String block, InputMarker inputMarker) throws Exception;
   
-  public abstract void copyFile(File inputFile, InputMarker inputMarker)
-      throws UnsupportedOperationException;
+  public abstract void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException;
 
-  public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
-    throws Exception {
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
     write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker);
   }
 
@@ -90,6 +104,12 @@ public abstract class Output extends ConfigBlock {
     this.destination = destination;
   }
 
+  protected LogSearchConfig logSearchConfig;
+
+  public void setLogSearchConfig(LogSearchConfig logSearchConfig) {
+    this.logSearchConfig = logSearchConfig;
+  }
+
   @Override
   public void addMetricsContainers(List<MetricData> metricsList) {
     super.addMetricsContainers(metricsList);
@@ -99,7 +119,6 @@ public abstract class Output extends ConfigBlock {
   @Override
   public synchronized void logStat() {
     super.logStat();
-
     logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
   }
   
@@ -115,5 +134,4 @@ public abstract class Output extends ConfigBlock {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index 4d6c43b..48716fa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -33,6 +33,7 @@ import org.apache.ambari.logfeeder.loglevelfilter.FilterLogData;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.MurmurHash;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -56,6 +57,16 @@ public class OutputManager {
     return outputs;
   }
 
+  public List<? extends OutputConfigMonitor> getOutputsToMonitor() {
+    List<Output> outputsToMonitor = new ArrayList<>();
+    for (Output output : outputs) {
+      if (output.monitorConfigChanges()) {
+        outputsToMonitor.add(output);
+      }
+    }
+    return outputsToMonitor;
+  }
+
   public void add(Output output) {
     this.outputs.add(output);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 162a7f8..596e022 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -25,9 +25,11 @@ import java.net.MalformedURLException;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -37,6 +39,8 @@ import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -44,18 +48,23 @@ import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.Krb5HttpClientConfigurer;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.DocCollection;
 
 import static org.apache.ambari.logfeeder.util.LogFeederUtil.LOGFEEDER_PROPERTIES_FILE;
 
-public class OutputSolr extends Output {
+public class OutputSolr extends Output implements CollectionStateWatcher {
 
+  private static final Logger LOG = Logger.getLogger(OutputSolr.class);
+
+  private static final int OUTPUT_PROPERTIES_WAIT_MS = 10000;
+  private static final int SHARDS_WAIT_MS = 10000;
+  
   private static final String DEFAULT_SOLR_JAAS_FILE = "/etc/security/keytabs/logsearch_solr.service.keytab";
   @LogSearchPropertyDescription(
     name = "logfeeder.solr.jaas.file",
@@ -66,8 +75,6 @@ public class OutputSolr extends Output {
   )
   private static final String SOLR_JAAS_FILE_PROPERTY = "logfeeder.solr.jaas.file";
 
-  private static final Logger LOG = Logger.getLogger(OutputSolr.class);
-
   private static final boolean DEFAULT_SOLR_KERBEROS_ENABLE = false;
   @LogSearchPropertyDescription(
     name = "logfeeder.solr.kerberos.enable",
@@ -80,17 +87,17 @@ public class OutputSolr extends Output {
 
   private static final int DEFAULT_MAX_BUFFER_SIZE = 5000;
   private static final int DEFAULT_MAX_INTERVAL_MS = 3000;
-  private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
-  private static final int DEFAULT_SPLIT_INTERVAL = 30;
   private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
   private static final boolean DEFAULT_SKIP_LOGTIME = false;
 
   private static final int RETRY_INTERVAL = 30;
 
+  private String type;
   private String collection;
   private String splitMode;
   private int splitInterval;
-  private int numberOfShards;
+  private List<String> shards;
+  private String zkConnectString;
   private int maxIntervalMS;
   private int workers;
   private int maxBufferSize;
@@ -98,10 +105,22 @@ public class OutputSolr extends Output {
   private int lastSlotByMin = -1;
   private boolean skipLogtime = false;
 
+  private final Object propertiesLock = new Object();
+
   private BlockingQueue<OutputData> outgoingBuffer = null;
   private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
 
   @Override
+  public boolean monitorConfigChanges() {
+    return true;
+  };
+
+  @Override
+  public String getOutputType() {
+    return type;
+  }
+
+  @Override
   protected String getStatMetricName() {
     return "output.solr.write_logs";
   }
@@ -110,24 +129,34 @@ public class OutputSolr extends Output {
   protected String getWriteBytesMetricName() {
     return "output.solr.write_bytes";
   }
-
+  
   @Override
   public void init() throws Exception {
     super.init();
     initParams();
     setupSecurity();
     createOutgoingBuffer();
+    createSolrStateWatcher();
     createSolrWorkers();
   }
 
   private void initParams() throws Exception {
-    splitMode = getStringValue("splits_interval_mins", "none");
-    if (!splitMode.equalsIgnoreCase("none")) {
-      splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
+    type = getStringValue("type");
+    while (true) {
+      OutputSolrProperties outputSolrProperties = logSearchConfig.getOutputSolrProperties(type);
+      if (outputSolrProperties == null) {
+        LOG.info("Output solr properties for type " + type + " is not available yet.");
+        try { Thread.sleep(OUTPUT_PROPERTIES_WAIT_MS); } catch (Exception e) { LOG.warn(e); }
+      } else {
+        initPropertiesFromLogSearchConfig(outputSolrProperties, true);
+        break;
+      }
+    }
+
+    zkConnectString = getStringValue("zk_connect_string");
+    if (StringUtils.isEmpty(zkConnectString)) {
+      throw new Exception("For solr output the zk_connect_string property need to be set");
     }
-    isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
-    
-    numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
 
     skipLogtime = getBooleanValue("skip_logtime", DEFAULT_SKIP_LOGTIME);
 
@@ -140,22 +169,39 @@ public class OutputSolr extends Output {
       maxBufferSize = 1;
     }
 
-    collection = getStringValue("collection");
-    if (StringUtils.isEmpty(collection)) {
-      throw new Exception("Collection property is mandatory");
-    }
+    LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d."
+        + getShortDescription(), workers, splitMode, splitInterval));
+  }
 
-    LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, numberOfShards=%d. "
-        + getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
+  @Override
+  public void outputConfigChanged(OutputProperties outputProperties) {
+    initPropertiesFromLogSearchConfig((OutputSolrProperties)outputProperties, false);
   }
 
+  private void initPropertiesFromLogSearchConfig(OutputSolrProperties outputSolrProperties, boolean init) {
+    synchronized (propertiesLock) {
+      splitMode = outputSolrProperties.getSplitIntervalMins();
+      if (!splitMode.equalsIgnoreCase("none")) {
+        splitInterval = Integer.parseInt(splitMode);
+      }
+      isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+
+      // collection can not be overwritten after initialization
+      if (init) {
+        collection = outputSolrProperties.getCollection();
+        if (StringUtils.isEmpty(collection)) {
+          throw new IllegalStateException("Collection property is mandatory");
+        }
+      }
+    }
+  }
 
   private void setupSecurity() {
     String jaasFile = LogFeederUtil.getStringProperty(SOLR_JAAS_FILE_PROPERTY, DEFAULT_SOLR_JAAS_FILE);
     boolean securityEnabled = LogFeederUtil.getBooleanProperty(SOLR_KERBEROS_ENABLE_PROPERTY, DEFAULT_SOLR_KERBEROS_ENABLE);
     if (securityEnabled) {
       System.setProperty("java.security.auth.login.config", jaasFile);
-      HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer());
+      HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer());
       LOG.info("setupSecurity() called for kerberos configuration, jaas file: " + jaasFile);
     }
   }
@@ -166,81 +212,70 @@ public class OutputSolr extends Output {
     outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize);
   }
 
-  private void createSolrWorkers() throws Exception, MalformedURLException {
-    String solrUrl = getStringValue("url");
-    String zkConnectString = getStringValue("zk_connect_string");
-    if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkConnectString)) {
-      throw new Exception("For solr output, either url or zk_connect_string property need to be set");
+  private void createSolrStateWatcher() throws Exception {
+    if ("none".equals(splitMode)) {
+      return;
+    }
+    
+    CloudSolrClient stateWatcherClient = createSolrClient();
+    stateWatcherClient.registerCollectionStateWatcher(collection, this);
+    while (true) {
+      if (shards == null) {
+        LOG.info("Shards are not available yet, waiting ...");
+        try { Thread.sleep(SHARDS_WAIT_MS); } catch (Exception e) { LOG.warn(e); }
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Override
+  public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+    synchronized (propertiesLock) {
+      shards = new ArrayList<>(collectionState.getSlicesMap().keySet());
+      Collections.sort(shards);
     }
+    return false;
+  }
 
+  private void createSolrWorkers() throws Exception, MalformedURLException {
     for (int count = 0; count < workers; count++) {
-      SolrClient solrClient = getSolrClient(solrUrl, zkConnectString, count);
+      CloudSolrClient solrClient = getSolrClient(count);
       createSolrWorkerThread(count, solrClient);
     }
   }
 
-  SolrClient getSolrClient(String solrUrl, String zkConnectString, int count) throws Exception, MalformedURLException {
-    SolrClient solrClient = createSolrClient(solrUrl, zkConnectString);
-    pingSolr(solrUrl, zkConnectString, count, solrClient);
-
-    return solrClient;
-  }
+  CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
+    CloudSolrClient solrClient = createSolrClient();
+    pingSolr(count, solrClient);
 
-  private SolrClient createSolrClient(String solrUrl, String zkConnectString) throws Exception, MalformedURLException {
-    SolrClient solrClient;
-    if (zkConnectString != null) {
-      solrClient = createCloudSolrClient(zkConnectString);
-    } else {
-      solrClient = createHttpSolarClient(solrUrl);
-    }
     return solrClient;
   }
 
-  private SolrClient createCloudSolrClient(String zkConnectString) throws Exception {
+  private CloudSolrClient createSolrClient() throws Exception {
     LOG.info("Using zookeepr. zkConnectString=" + zkConnectString);
-    collection = getStringValue("collection");
-    if (StringUtils.isEmpty(collection)) {
-      throw new Exception("For solr cloud property collection is mandatory");
-    }
     LOG.info("Using collection=" + collection);
 
-    CloudSolrClient solrClient = new CloudSolrClient(zkConnectString);
+    CloudSolrClient solrClient = new CloudSolrClient.Builder().withZkHost(zkConnectString).build();
     solrClient.setDefaultCollection(collection);
     return solrClient;
   }
 
-  private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
-    String[] solrUrls = StringUtils.split(solrUrl, ",");
-    if (solrUrls.length == 1) {
-      LOG.info("Using SolrURL=" + solrUrl);
-      return new HttpSolrClient(solrUrl + "/" + collection);
-    } else {
-      LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
-      LOG.info("Initial URL for LB solr=" + solrUrls[0] + "/" + collection);
-      LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0] + "/" + collection);
-      for (int i = 1; i < solrUrls.length; i++) {
-        LOG.info("Adding URL for LB solr=" + solrUrls[i] + "/" + collection);
-        lbSolrClient.addSolrServer(solrUrls[i] + "/" + collection);
-      }
-      return lbSolrClient;
-    }
-  }
-
-  private void pingSolr(String solrUrl, String zkConnectString, int count, SolrClient solrClient) {
+  private void pingSolr(int count, CloudSolrClient solrClient) {
     try {
-      LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString + ", urls=" + solrUrl);
+      LOG.info("Pinging Solr server. zkConnectString=" + zkConnectString);
       SolrPingResponse response = solrClient.ping();
       if (response.getStatus() == 0) {
         LOG.info("Ping to Solr server is successful for worker=" + count);
       } else {
         LOG.warn(
-            String.format("Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkConnectString=%s, " +
-                "collection=%s, response=%s", count, solrUrl, zkConnectString, collection, response));
+            String.format("Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s, " +
+                "response=%s", count, zkConnectString, collection, response));
       }
     } catch (Throwable t) {
       LOG.warn(String.format(
-          "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkConnectString=%s, collection=%s",
-          count, solrUrl, zkConnectString, collection), t);
+          "Ping to Solr server failed. It would check again. worker=%d, zkConnectString=%s, collection=%s", count,
+          zkConnectString, collection), t);
     }
   }
 
@@ -402,9 +437,11 @@ public class OutputSolr extends Output {
       boolean result = false;
       while (!isDrain()) {
         try {
-          if (isComputeCurrentCollection) {
-            // Compute the current router value
-            addRouterField();
+          synchronized (propertiesLock) {
+            if (isComputeCurrentCollection) {
+              // Compute the current router value
+              addRouterField();
+            }
           }
           addToSolr(outputData);
           resetLocalBuffer();
@@ -468,9 +505,9 @@ public class OutputSolr extends Output {
       int currMin = cal.get(Calendar.MINUTE);
 
       int minOfWeek = (weekDay - 1) * 24 * 60 + currHour * 60 + currMin;
-      int slotByMin = minOfWeek / splitInterval % numberOfShards;
+      int slotByMin = minOfWeek / splitInterval % shards.size();
 
-      String shard = "shard" + slotByMin;
+      String shard = shards.get(slotByMin);
 
       if (lastSlotByMin != slotByMin) {
         LOG.info("Switching to shard " + shard + ", output=" + getShortDescription());

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
index 8985110..ce040f9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -28,8 +28,10 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logsearch.config.api.LogSearchConfig;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
 import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrInputDocument;
@@ -48,6 +50,7 @@ public class OutputSolrTest {
   private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
 
   private OutputSolr outputSolr;
+  private LogSearchConfig logSearchConfigMock;
   private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>();
 
   @Rule
@@ -56,8 +59,9 @@ public class OutputSolrTest {
   @Before
   public void init() throws Exception {
     outputSolr = new OutputSolr() {
+      @SuppressWarnings("deprecation")
       @Override
-      SolrClient getSolrClient(String solrUrl, String zkConnectString, int count) throws Exception, MalformedURLException {
+      CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
         return new CloudSolrClient(null) {
           private static final long serialVersionUID = 1L;
 
@@ -74,6 +78,13 @@ public class OutputSolrTest {
         };
       }
     };
+    
+    OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl("hadoop_logs", "none");
+    logSearchConfigMock = EasyMock.createNiceMock(LogSearchConfig.class);
+    EasyMock.expect(logSearchConfigMock.getOutputSolrProperties("service")).andReturn(outputSolrProperties);
+    EasyMock.replay(logSearchConfigMock);
+    
+    outputSolr.setLogSearchConfig(logSearchConfigMock);
   }
 
   @Test
@@ -81,9 +92,9 @@ public class OutputSolrTest {
     LOG.info("testOutputToSolr_uploadData()");
 
     Map<String, Object> config = new HashMap<String, Object>();
-    config.put("url", "some url");
+    config.put("zk_connect_string", "some zk_connect_string");
     config.put("workers", "3");
-    config.put("collection", "some collection");
+    config.put("type", "service");
 
     outputSolr.loadConfig(config);
     outputSolr.init();
@@ -138,22 +149,21 @@ public class OutputSolrTest {
         assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue);
         assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue);
 
-        assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue,
-            expectedValue);
+        assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue, expectedValue);
       }
     }
   }
 
   @Test
-  public void testOutputToSolr_noUrlOrZkConnectString() throws Exception {
+  public void testOutputToSolr_noZkConnectString() throws Exception {
     LOG.info("testOutputToSolr_noUrlOrZkConnectString()");
 
     expectedException.expect(Exception.class);
-    expectedException.expectMessage("For solr output, either url or zk_connect_string property need to be set");
+    expectedException.expectMessage("For solr output the zk_connect_string property need to be set");
 
     Map<String, Object> config = new HashMap<String, Object>();
     config.put("workers", "3");
-    config.put("collection", "some collection");
+    config.put("type", "service");
 
     outputSolr.loadConfig(config);
     outputSolr.init();
@@ -162,5 +172,6 @@ public class OutputSolrTest {
   @After
   public void cleanUp() {
     receivedDocs.clear();
+    EasyMock.verify(logSearchConfigMock);
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
index c569a27..4a44e60 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrAuditLogPropsConfig.java
@@ -241,4 +241,9 @@ public class SolrAuditLogPropsConfig implements SolrPropsConfig {
   public void setAliasNameIn(String aliasNameIn) {
     this.aliasNameIn = aliasNameIn;
   }
+
+  @Override
+  public String getLogType() {
+    return "audit";
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
index 975e6a7..822cea4 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrEventHistoryPropsConfig.java
@@ -145,4 +145,9 @@ public class SolrEventHistoryPropsConfig extends SolrConnectionPropsConfig {
   void setPopulateIntervalMins(Integer populateIntervalMins) {
     this.populateIntervalMins = populateIntervalMins;
   }
+
+  @Override
+  public String getLogType() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
index ceddf7e..cd0a1c2 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrPropsConfig.java
@@ -58,4 +58,6 @@ public interface SolrPropsConfig {
   String getConfigSetFolder();
 
   void setConfigSetFolder(String configSetFolder);
+  
+  String getLogType();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
index e5039d5..6a0e6b1 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/conf/SolrServiceLogPropsConfig.java
@@ -126,4 +126,9 @@ public class SolrServiceLogPropsConfig extends SolrConnectionPropsConfig {
   public void setReplicationFactor(Integer replicationFactor) {
     this.replicationFactor = replicationFactor;
   }
+
+  @Override
+  public String getLogType() {
+    return "service";
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
index c34dce6..3f6df75 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/LogSearchConfigConfigurer.java
@@ -19,6 +19,7 @@
 
 package org.apache.ambari.logsearch.configurer;
 
+import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Named;
 
@@ -45,6 +46,8 @@ public class LogSearchConfigConfigurer implements Configurer {
   @Inject
   private LogSearchConfigState logSearchConfigState;
   
+  @PostConstruct
+  @Override
   public void start() {
     Thread setupThread = new Thread("setup_logsearch_config") {
       @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
index f2d022e..225f5a3 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/configurer/SolrCollectionConfigurer.java
@@ -56,7 +56,7 @@ public class SolrCollectionConfigurer implements Configurer {
   private final SolrDaoBase solrDaoBase;
   private final boolean hasEnumConfig; // enumConfig.xml for solr collection
 
-  public SolrCollectionConfigurer(final SolrDaoBase solrDaoBase, final boolean hasEnumConfig) {
+  public SolrCollectionConfigurer(SolrDaoBase solrDaoBase, boolean hasEnumConfig) {
     this.solrDaoBase = solrDaoBase;
     this.hasEnumConfig = hasEnumConfig;
   }
@@ -215,7 +215,8 @@ public class SolrCollectionConfigurer implements Configurer {
     return status;
   }
 
-  private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig, boolean reloadCollectionNeeded) {
+  private void createCollectionsIfNeeded(CloudSolrClient solrClient, SolrCollectionState state, SolrPropsConfig solrPropsConfig,
+      boolean reloadCollectionNeeded) {
     try {
       List<String> allCollectionList = new ListCollectionHandler().handle(solrClient, null);
       boolean collectionCreated = new CreateCollectionHandler(allCollectionList).handle(solrClient, solrPropsConfig);

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
index 3eea08f..4142176 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/AuditSolrDao.java
@@ -69,6 +69,7 @@ public class AuditSolrDao extends SolrDaoBase {
     String rangerAuditCollection = solrAuditLogPropsConfig.getRangerCollection();
 
     try {
+      waitForLogSearchConfig();
       new SolrCollectionConfigurer(this, true).start();
       boolean createAlias = (aliasNameIn != null && StringUtils.isNotBlank(rangerAuditCollection));
       if (createAlias) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
index 308ef1f..0752ac0 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/ServiceLogsSolrDao.java
@@ -65,6 +65,7 @@ public class ServiceLogsSolrDao extends SolrDaoBase {
   public void postConstructor() {
     LOG.info("postConstructor() called.");
     try {
+      waitForLogSearchConfig();
       new SolrCollectionConfigurer(this, true).start();
     } catch (Exception e) {
       LOG.error("error while connecting to Solr for service logs : solrUrl=" + solrServiceLogPropsConfig.getSolrUrl()

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
index b30b6ef..15f59e4 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/dao/SolrDaoBase.java
@@ -24,6 +24,7 @@ import org.apache.ambari.logsearch.common.LogType;
 import org.apache.ambari.logsearch.common.MessageEnums;
 import org.apache.ambari.logsearch.conf.SolrKerberosConfig;
 import org.apache.ambari.logsearch.conf.SolrPropsConfig;
+import org.apache.ambari.logsearch.conf.global.LogSearchConfigState;
 import org.apache.ambari.logsearch.conf.global.SolrCollectionState;
 import org.apache.ambari.logsearch.util.RESTErrorUtil;
 import org.apache.ambari.logsearch.util.SolrUtil;
@@ -53,11 +54,21 @@ public abstract class SolrDaoBase {
 
   @Inject
   private SolrKerberosConfig solrKerberosConfig;
-  
+
+  @Inject
+  protected LogSearchConfigState logSearchConfigState;
+
   protected SolrDaoBase(LogType logType) {
     this.logType = logType;
   }
 
+  protected void waitForLogSearchConfig() {
+    while (!logSearchConfigState.isLogSearchConfigAvailable()) {
+      LOG.info("Log Search config not available yet, waiting...");
+      try { Thread.sleep(1000); } catch (Exception e) { LOG.warn("Exception during waiting for Log Search Config", e); }
+    }
+  }
+
   public QueryResponse process(SolrQuery solrQuery, String event) {
     SolrUtil.removeDoubleOrTripleEscapeFromFilters(solrQuery);
     LOG.info("Solr query will be processed: " + solrQuery);

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
index 752a1e1..b6e9def 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/handler/CreateCollectionHandler.java
@@ -19,6 +19,9 @@
 package org.apache.ambari.logsearch.handler;
 
 import org.apache.ambari.logsearch.conf.SolrPropsConfig;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
+import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
+import org.apache.ambari.logsearch.configurer.LogSearchConfigConfigurer;
 import org.apache.commons.lang.StringUtils;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -50,7 +53,7 @@ public class CreateCollectionHandler implements SolrZkRequestHandler<Boolean> {
   private static final String MODIFY_COLLECTION_QUERY = "/admin/collections?action=MODIFYCOLLECTION&collection=%s&%s=%d";
   private static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
 
-  private List<String> allCollectionList;
+  private final List<String> allCollectionList;
 
   public CreateCollectionHandler(List<String> allCollectionList) {
     this.allCollectionList = allCollectionList;
@@ -58,12 +61,19 @@ public class CreateCollectionHandler implements SolrZkRequestHandler<Boolean> {
 
   @Override
   public Boolean handle(CloudSolrClient solrClient, SolrPropsConfig solrPropsConfig) throws Exception {
+    if (solrPropsConfig.getLogType() != null) {
+      OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl(solrPropsConfig.getCollection(),
+          solrPropsConfig.getSplitInterval());
+      LogSearchConfigConfigurer.getConfig().saveOutputSolrProperties(solrPropsConfig.getLogType(), outputSolrProperties);
+    }
+
     boolean result;
     if (solrPropsConfig.getSplitInterval().equalsIgnoreCase("none")) {
       result = createCollection(solrClient, solrPropsConfig, this.allCollectionList);
     } else {
       result = setupCollectionsWithImplicitRouting(solrClient, solrPropsConfig, this.allCollectionList);
     }
+
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
index 2c143c0..a1181b4 100644
--- a/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
+++ b/ambari-logsearch/ambari-logsearch-server/src/main/java/org/apache/ambari/logsearch/manager/ShipperConfigManager.java
@@ -33,7 +33,6 @@ import org.apache.log4j.Logger;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 
-import javax.annotation.PostConstruct;
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.validation.ConstraintViolation;
@@ -49,11 +48,6 @@ public class ShipperConfigManager extends JsonManagerBase {
 
   @Inject
   private LogSearchConfigConfigurer logSearchConfigConfigurer;
-
-  @PostConstruct
-  private void postConstructor() {
-    logSearchConfigConfigurer.start();
-  }
   
   public List<String> getServices(String clusterName) {
     return LogSearchConfigConfigurer.getConfig().getServices(clusterName);
@@ -66,7 +60,7 @@ public class ShipperConfigManager extends JsonManagerBase {
 
   public Response createInputConfig(String clusterName, String serviceName, LSServerInputConfig inputConfig) {
     try {
-      if (LogSearchConfigConfigurer.getConfig().inputConfigExists(clusterName, serviceName)) {
+      if (LogSearchConfigConfigurer.getConfig().inputConfigExistsServer(clusterName, serviceName)) {
         return Response.serverError()
             .type(MediaType.APPLICATION_JSON)
             .entity(ImmutableMap.of("errorMessage", "Input config already exists for service " + serviceName))
@@ -83,7 +77,7 @@ public class ShipperConfigManager extends JsonManagerBase {
 
   public Response setInputConfig(String clusterName, String serviceName, LSServerInputConfig inputConfig) {
     try {
-      if (!LogSearchConfigConfigurer.getConfig().inputConfigExists(clusterName, serviceName)) {
+      if (!LogSearchConfigConfigurer.getConfig().inputConfigExistsServer(clusterName, serviceName)) {
         return Response.serverError()
             .type(MediaType.APPLICATION_JSON)
             .entity(ImmutableMap.of("errorMessage", "Input config doesn't exist for service " + serviceName))

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
index 55fd36c..f41e981 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/output.config.json
@@ -5,9 +5,7 @@
       "comment": "Output to solr for service logs",
       "destination": "solr",
       "zk_connect_string": "localhost:9983",
-      "collection": "hadoop_logs",
-      "number_of_shards": "3",
-      "splits_interval_mins": "2",
+      "type": "service",
       "skip_logtime": "true",
       "conditions": {
         "fields": {
@@ -22,9 +20,7 @@
       "is_enabled": "true",
       "destination": "solr",
       "zk_connect_string": "localhost:9983",
-      "collection": "audit_logs",
-      "number_of_shards": "3",
-      "splits_interval_mins": "2",
+      "type": "audit",
       "skip_logtime": "true",
       "conditions": {
         "fields": {
@@ -35,4 +31,4 @@
       }
     }
   ]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
index b4502d6..6caa770 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog300.java
@@ -352,41 +352,59 @@ public class UpgradeCatalog300 extends AbstractUpgradeCatalog {
             updateConfigurationPropertiesForCluster(cluster, "logfeeder-properties", newProperties, true, true);
           }
 
-          Config logfeederLog4jProperties = cluster.getDesiredConfigByType("logfeeder-log4j");
-          if (logfeederLog4jProperties != null) {
-            String content = logfeederLog4jProperties.getProperties().get("content");
+          Config logFeederLog4jProperties = cluster.getDesiredConfigByType("logfeeder-log4j");
+          if (logFeederLog4jProperties != null) {
+            String content = logFeederLog4jProperties.getProperties().get("content");
             if (content.contains("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">")) {
               content = content.replace("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">");
               updateConfigurationPropertiesForCluster(cluster, "logfeeder-log4j", Collections.singletonMap("content", content), true, true);
             }
           }
 
-          Config logsearchLog4jProperties = cluster.getDesiredConfigByType("logsearch-log4j");
-          if (logsearchLog4jProperties != null) {
-            String content = logsearchLog4jProperties.getProperties().get("content");
+          Config logSearchLog4jProperties = cluster.getDesiredConfigByType("logsearch-log4j");
+          if (logSearchLog4jProperties != null) {
+            String content = logSearchLog4jProperties.getProperties().get("content");
             if (content.contains("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">")) {
               content = content.replace("<!DOCTYPE log4j:configuration SYSTEM \"log4j.dtd\">", "<!DOCTYPE log4j:configuration SYSTEM \"http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd\">");
               updateConfigurationPropertiesForCluster(cluster, "logsearch-log4j", Collections.singletonMap("content", content), true, true);
             }
           }
 
-          Config logsearchServiceLogsConfig = cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig");
-          if (logsearchServiceLogsConfig != null) {
-            String content = logsearchServiceLogsConfig.getProperties().get("content");
+          Config logSearchServiceLogsConfig = cluster.getDesiredConfigByType("logsearch-service_logs-solrconfig");
+          if (logSearchServiceLogsConfig != null) {
+            String content = logSearchServiceLogsConfig.getProperties().get("content");
             if (content.contains("class=\"solr.admin.AdminHandlers\"")) {
               content = content.replaceAll("(?s)<requestHandler name=\"/admin/\".*?class=\"solr.admin.AdminHandlers\" />", "");
               updateConfigurationPropertiesForCluster(cluster, "logsearch-service_logs-solrconfig", Collections.singletonMap("content", content), true, true);
             }
           }
 
-          Config logsearchAuditLogsConfig = cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig");
-          if (logsearchAuditLogsConfig != null) {
-            String content = logsearchAuditLogsConfig.getProperties().get("content");
+          Config logSearchAuditLogsConfig = cluster.getDesiredConfigByType("logsearch-audit_logs-solrconfig");
+          if (logSearchAuditLogsConfig != null) {
+            String content = logSearchAuditLogsConfig.getProperties().get("content");
             if (content.contains("class=\"solr.admin.AdminHandlers\"")) {
               content = content.replaceAll("(?s)<requestHandler name=\"/admin/\".*?class=\"solr.admin.AdminHandlers\" />", "");
               updateConfigurationPropertiesForCluster(cluster, "logsearch-audit_logs-solrconfig", Collections.singletonMap("content", content), true, true);
             }
           }
+          
+          Config logFeederOutputConfig = cluster.getDesiredConfigByType("logfeeder-output-config");
+          if (logFeederOutputConfig != null) {
+            String content = logFeederOutputConfig.getProperties().get("content");
+            content = content.replace(
+                "      \"collection\":\"{{logsearch_solr_collection_service_logs}}\",\n" +
+                "      \"number_of_shards\": \"{{logsearch_collection_service_logs_numshards}}\",\n" +
+                "      \"splits_interval_mins\": \"{{logsearch_service_logs_split_interval_mins}}\",\n",
+                "      \"type\": \"service\",\n");
+
+            content = content.replace(
+                "      \"collection\":\"{{logsearch_solr_collection_audit_logs}}\",\n" +
+                "      \"number_of_shards\": \"{{logsearch_collection_audit_logs_numshards}}\",\n" +
+                "      \"splits_interval_mins\": \"{{logsearch_audit_logs_split_interval_mins}}\",\n",
+                "      \"type\": \"audit\",\n");
+
+            updateConfigurationPropertiesForCluster(cluster, "logsearch-output-config", Collections.singletonMap("content", content), true, true);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dc85e67d/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2 b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
index 214e5ba..0c599c9 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/properties/output.config.json.j2
@@ -22,9 +22,7 @@
       "is_enabled":"{{solr_service_logs_enable}}",
       "destination":"solr",
       "zk_connect_string":"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}",
-      "collection":"{{logsearch_solr_collection_service_logs}}",
-      "number_of_shards": "{{logsearch_collection_service_logs_numshards}}",
-      "splits_interval_mins": "{{logsearch_service_logs_split_interval_mins}}",
+      "type": "service",
       "conditions":{
         "fields":{
           "rowtype":[
@@ -41,9 +39,7 @@
       "is_enabled":"{{solr_audit_logs_enable}}",
       "destination":"solr",
       "zk_connect_string":"{{logsearch_solr_zk_quorum}}{{logsearch_solr_zk_znode}}",
-      "collection":"{{logsearch_solr_collection_audit_logs}}",
-      "number_of_shards": "{{logsearch_collection_audit_logs_numshards}}",
-      "splits_interval_mins": "{{logsearch_audit_logs_split_interval_mins}}",
+      "type": "audit",
       "conditions":{
         "fields":{
           "rowtype":[


Mime
View raw message