ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oleew...@apache.org
Subject [47/61] [abbrv] [partial] ambari git commit: AMBARI-15679. Initial commit for LogSearch service definition (oleewre)
Date Sat, 09 Apr 2016 10:15:43 GMT
http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
new file mode 100644
index 0000000..dd67d07
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.ConfigBlock;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+
+import com.google.gson.reflect.TypeToken;
+
+public abstract class Output extends ConfigBlock {
+  static private Logger logger = Logger.getLogger(Output.class);
+
+  String destination = null;
+
+  Type jsonType = new TypeToken<Map<String, String>>() {
+  }.getType();
+
+  public MetricCount writeBytesMetric = new MetricCount();
+
+  @Override
+  public String getShortDescription() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public String getNameForThread() {
+    if (destination != null) {
+      return destination;
+    }
+    return super.getNameForThread();
+  }
+
+  public void write(String block, InputMarker inputMarker) throws Exception {
+    // No-op. Please implement in sub classes
+  }
+
+  /**
+   * @param jsonObj
+   * @param input
+   * @throws Exception
+   */
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
+    throws Exception {
+    write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker);
+  }
+
+  boolean isClosed = false;
+
+  /**
+   * Extend this method to clean up
+   */
+  public void close() {
+    logger.info("Calling base close()." + getShortDescription());
+    isClosed = true;
+  }
+
+  /**
+   * This is called on shutdown. All output should extend it.
+   *
+   * @return
+   */
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  public long getPendingCount() {
+    return 0;
+  }
+
+  public String getDestination() {
+    return destination;
+  }
+
+  public void setDestination(String destination) {
+    this.destination = destination;
+  }
+
+  @Override
+  public void addMetricsContainers(List<MetricCount> metricsList) {
+    super.addMetricsContainers(metricsList);
+    metricsList.add(writeBytesMetric);
+  }
+
+  @Override
+  public synchronized void logStat() {
+    super.logStat();
+
+    //Printing stat for writeBytesMetric
+    logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
new file mode 100644
index 0000000..8df1d29
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.input.InputMarker;
+
+/**
+ * This contains the output json object and InputMarker.
+ */
+public class OutputData {
+  Map<String, Object> jsonObj;
+  InputMarker inputMarker;
+
+  /**
+   * @param jsonObj
+   * @param inputMarker
+   */
+  public OutputData(Map<String, Object> jsonObj, InputMarker inputMarker) {
+    super();
+    this.jsonObj = jsonObj;
+    this.inputMarker = inputMarker;
+  }
+
+  @Override
+  public String toString() {
+    return "OutputData [jsonObj=" + jsonObj + ", inputMarker="
+      + inputMarker + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
new file mode 100644
index 0000000..b6e36d6
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.log4j.Logger;
+
+public class OutputFile extends Output {
+  static Logger logger = Logger.getLogger(OutputFile.class);
+
+  PrintWriter outWriter = null;
+  String filePath = null;
+  String codec;
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+
+    filePath = getStringValue("path");
+    if (filePath == null || filePath.isEmpty()) {
+      logger.error("Filepath config property <path> is not set in config file.");
+      return;
+    }
+    codec = getStringValue("codec");
+    if (codec == null || codec.trim().isEmpty()) {
+      codec = "json";
+    } else {
+      if (codec.trim().equalsIgnoreCase("csv")) {
+        codec = "csv";
+      } else if (codec.trim().equalsIgnoreCase("json")) {
+        codec = "csv";
+      } else {
+        logger.error("Unsupported codec type. codec=" + codec
+          + ", will use json");
+        codec = "json";
+      }
+    }
+    logger.info("Out filePath=" + filePath + ", codec=" + codec);
+    File outFile = new File(filePath);
+    if (outFile.getParentFile() != null) {
+      File parentDir = outFile.getParentFile();
+      if (!parentDir.isDirectory()) {
+        parentDir.mkdirs();
+      }
+    }
+
+    outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile,
+      true)));
+
+    logger.info("init() is successfull. filePath="
+      + outFile.getAbsolutePath());
+  }
+
+  @Override
+  public void close() {
+    logger.info("Closing file." + getShortDescription());
+    if (outWriter != null) {
+      try {
+        outWriter.close();
+      } catch (Throwable t) {
+        // Ignore this exception
+      }
+    }
+    isClosed = true;
+  }
+
+  @Override
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
+    throws Exception {
+    String outStr = null;
+    if (codec.equals("csv")) {
+      // Convert to CSV
+      CSVPrinter csvPrinter = new CSVPrinter(outWriter, CSVFormat.RFC4180);
+      //TODO:
+    } else {
+      outStr = LogFeederUtil.getGson().toJson(jsonObj);
+    }
+    if (outWriter != null && outStr != null) {
+      statMetric.count++;
+
+      outWriter.println(outStr);
+      outWriter.flush();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.output.Output#write()
+   */
+  @Override
+  synchronized public void write(String block, InputMarker inputMarker) throws Exception {
+    if (outWriter != null && block != null) {
+      statMetric.count++;
+
+      outWriter.println(block);
+      outWriter.flush();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "output:destination=file,path=" + filePath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
new file mode 100644
index 0000000..c594dd4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputKafka extends Output {
+  static private Logger logger = Logger.getLogger(OutputKafka.class);
+
+  String brokerList = null;
+  String topic = null;
+  boolean isAsync = true;
+  long messageCount = 0;
+  int batchSize = 5000;
+  int lingerMS = 1000;
+
+  private KafkaProducer<String, String> producer = null;
+  BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>();
+
+  // Let's start with the assumption Kafka is down
+  boolean isKafkaBrokerUp = false;
+
+  static final int FAILED_RETRY_INTERVAL = 30;
+  static final int CATCHUP_RETRY_INTERVAL = 5;
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+    statMetric.metricsName = "output.kafka.write_logs";
+    writeBytesMetric.metricsName = "output.kafka.write_bytes";
+
+    brokerList = getStringValue("broker_list");
+    topic = getStringValue("topic");
+    isAsync = getBooleanValue("is_async", true);
+    batchSize = getIntValue("batch_size", batchSize);
+    lingerMS = getIntValue("linger_ms", lingerMS);
+
+    Map<String, Object> kafkaCustomProperties = new HashMap<String, Object>();
+    // Get all kafka custom properties
+    for (String key : configs.keySet()) {
+      if (key.startsWith("kafka.")) {
+        Object value = configs.get(key);
+        if (value == null || value.toString().length() == 0) {
+          continue;
+        }
+        String kafkaKey = key.substring("kafka.".length());
+        kafkaCustomProperties.put(kafkaKey, value);
+      }
+    }
+
+    if (StringUtils.isEmpty(brokerList)) {
+      throw new Exception(
+        "For kafka output, bootstrap broker_list is needed");
+    }
+
+    if (StringUtils.isEmpty(topic)) {
+      throw new Exception("For kafka output, topic is needed");
+    }
+
+    Properties props = new Properties();
+    // 0.9.0
+    props.put("bootstrap.servers", brokerList);
+    props.put("client.id", "logfeeder_producer");
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+    props.put("compression.type", "snappy");
+    // props.put("retries", "3");
+    props.put("batch.size", batchSize);
+    props.put("linger.ms", lingerMS);
+
+    for (String kafkaKey : kafkaCustomProperties.keySet()) {
+      logger.info("Adding custom Kafka property. " + kafkaKey + "="
+        + kafkaCustomProperties.get(kafkaKey));
+      props.put(kafkaKey, kafkaCustomProperties.get(kafkaKey));
+    }
+
+    // props.put("metadata.broker.list", brokerList);
+
+    producer = new KafkaProducer<String, String>(props);
+    Thread retryThread = new Thread("kafka-writer-retry,topic=" + topic) {
+      @Override
+      public void run() {
+        KafkaCallBack kafkaCallBack = null;
+        logger.info("Started thread to monitor failed messsages. "
+          + getShortDescription());
+        while (true) {
+          try {
+            if (kafkaCallBack == null) {
+              kafkaCallBack = failedMessages.take();
+            }
+            if (publishMessage(kafkaCallBack.message,
+              kafkaCallBack.inputMarker)) {
+              // logger.info("Sent message. count="
+              // + kafkaCallBack.thisMessageNumber);
+              kafkaCallBack = null;
+            } else {
+              // Should wait for sometime
+              logger.error("Kafka is down. messageNumber="
+                + kafkaCallBack.thisMessageNumber
+                + ". Going to sleep for "
+                + FAILED_RETRY_INTERVAL + " seconds");
+              Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
+            }
+
+          } catch (Throwable t) {
+            final String LOG_MESSAGE_KEY = this.getClass()
+              .getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
+            LogFeederUtil.logErrorMessageByInterval(
+              LOG_MESSAGE_KEY,
+              "Error sending message to Kafka during retry. message="
+                + (kafkaCallBack == null ? null
+                : kafkaCallBack.message), t,
+              logger, Level.ERROR);
+          }
+        }
+
+      }
+    };
+    retryThread.setDaemon(true);
+    retryThread.start();
+  }
+
+  @Override
+  public void setDrain(boolean drain) {
+    super.setDrain(drain);
+  }
+
+  /**
+   * Flush document buffer
+   */
+  public void flush() {
+    logger.info("Flush called...");
+    setDrain(true);
+  }
+
+  @Override
+  public void close() {
+    logger.info("Closing Kafka client...");
+    flush();
+    if (producer != null) {
+      try {
+        producer.close();
+      } catch (Throwable t) {
+        logger.error("Error closing Kafka topic. topic=" + topic);
+      }
+    }
+    logger.info("Closed Kafka client");
+    super.close();
+  }
+
+  @Override
+  synchronized public void write(String block, InputMarker inputMarker) throws Exception {
+    while (!isDrain() && !inputMarker.input.isDrain()) {
+      try {
+        if (failedMessages.size() == 0) {
+          if (publishMessage(block, inputMarker)) {
+            break;
+          }
+        }
+        if (isDrain() || inputMarker.input.isDrain()) {
+          break;
+        }
+        if (!isKafkaBrokerUp) {
+          logger.error("Kafka is down. Going to sleep for "
+            + FAILED_RETRY_INTERVAL + " seconds");
+          Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
+
+        } else {
+          logger.warn("Kafka is still catching up from previous failed messages. outstanding messages="
+            + failedMessages.size()
+            + " Going to sleep for "
+            + CATCHUP_RETRY_INTERVAL + " seconds");
+          Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
+        }
+      } catch (Throwable t) {
+        // ignore
+        break;
+      }
+    }
+  }
+
+  private boolean publishMessage(String block, InputMarker inputMarker) {
+    if (isAsync && isKafkaBrokerUp) { // Send asynchronously
+      producer.send(new ProducerRecord<String, String>(topic, block),
+        new KafkaCallBack(this, block, inputMarker, ++messageCount));
+      return true;
+    } else { // Send synchronously
+      try {
+        // Not using key. Let it round robin
+        RecordMetadata metadata = producer.send(
+          new ProducerRecord<String, String>(topic, block)).get();
+        if (metadata != null) {
+          statMetric.count++;
+          writeBytesMetric.count += block.length();
+        }
+        if (!isKafkaBrokerUp) {
+          logger.info("Started writing to kafka. "
+            + getShortDescription());
+          isKafkaBrokerUp = true;
+        }
+        return true;
+      } catch (InterruptedException e) {
+        isKafkaBrokerUp = false;
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+          + "_KAFKA_INTERRUPT";
+        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+          "InterruptedException-Error sending message to Kafka",
+          e, logger, Level.ERROR);
+      } catch (ExecutionException e) {
+        isKafkaBrokerUp = false;
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+          + "_KAFKA_EXECUTION";
+        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+          "ExecutionException-Error sending message to Kafka", e,
+          logger, Level.ERROR);
+      } catch (Throwable t) {
+        isKafkaBrokerUp = false;
+        final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+          + "_KAFKA_WRITE_ERROR";
+        LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+          "GenericException-Error sending message to Kafka", t,
+          logger, Level.ERROR);
+      }
+    }
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "output:destination=kafka,topic=" + topic;
+  }
+
+}
+
+class KafkaCallBack implements Callback {
+  static private Logger logger = Logger.getLogger(KafkaCallBack.class);
+
+  long thisMessageNumber;
+  OutputKafka output = null;
+  String message;
+  InputMarker inputMarker;
+
+  public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker,
+                       long messageCount) {
+    this.thisMessageNumber = messageCount;
+    this.output = output;
+    this.inputMarker = inputMarker;
+    this.message = message;
+  }
+
+  public void onCompletion(RecordMetadata metadata, Exception exception) {
+    if (metadata != null) {
+      if (!output.isKafkaBrokerUp) {
+        logger.info("Started writing to kafka. "
+          + output.getShortDescription());
+        output.isKafkaBrokerUp = true;
+      }
+      output.incrementStat(1);
+      output.writeBytesMetric.count += message.length();
+
+      // metadata.partition();
+      // metadata.offset();
+    } else {
+      output.isKafkaBrokerUp = false;
+      final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+        + "_KAFKA_ASYNC_ERROR";
+      LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+        "Error sending message to Kafka. Async Callback",
+        exception, logger, Level.ERROR);
+
+      output.failedMessages.add(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
new file mode 100644
index 0000000..7cd911d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+
+public class OutputSolr extends Output {
+  static private Logger logger = Logger.getLogger(OutputSolr.class);
+
+  private static final String ROUTER_FIELD = "_router_field_";
+
+  String solrUrl = null;
+  String zkHosts = null;
+  String collection = null;
+  String splitMode = "none";
+  int splitInterval = 0;
+  int numberOfShards = 1;
+  boolean isComputeCurrentCollection = false;
+
+  int maxBufferSize = 5000;
+  int maxIntervalMS = 3000;
+  int workers = 1;
+
+  BlockingQueue<OutputData> outgoingBuffer = null;
+  List<SolrWorkerThread> writerThreadList = new ArrayList<SolrWorkerThread>();
+  private static final int RETRY_INTERVAL = 30;
+
+  int lastSlotByMin = -1;
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+    statMetric.metricsName = "output.solr.write_logs";
+    writeBytesMetric.metricsName = "output.solr.write_bytes";
+
+    solrUrl = getStringValue("url");
+    zkHosts = getStringValue("zk_hosts");
+    splitMode = getStringValue("splits_interval_mins", splitMode);
+    if (!splitMode.equalsIgnoreCase("none")) {
+      splitInterval = getIntValue("split_interval_mins", 30);
+    }
+    numberOfShards = getIntValue("number_of_shards", numberOfShards);
+
+    maxBufferSize = getIntValue("flush_size", maxBufferSize);
+    if (maxBufferSize < 1) {
+      logger.warn("maxBufferSize is less than 1. Making it 1");
+    }
+    maxIntervalMS = getIntValue("idle_flush_time_ms", maxIntervalMS);
+    workers = getIntValue("workers", workers);
+
+    logger.info("Config: Number of workers=" + workers + ", splitMode="
+        + splitMode + ", splitInterval=" + splitInterval
+        + ", numberOfShards=" + numberOfShards + ". "
+        + getShortDescription());
+
+    if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkHosts)) {
+      throw new Exception(
+          "For solr output, either url or zk_hosts property need to be set");
+    }
+
+    int bufferSize = maxBufferSize * (workers + 3);
+    logger.info("Creating blocking queue with bufferSize=" + bufferSize);
+    // outgoingBuffer = new ArrayBlockingQueue<OutputData>(bufferSize);
+    outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize);
+
+    for (int count = 0; count < workers; count++) {
+      SolrClient solrClient = null;
+      CloudSolrClient solrClouldClient = null;
+      if (zkHosts != null) {
+        logger.info("Using zookeepr. zkHosts=" + zkHosts);
+        collection = getStringValue("collection");
+        if (StringUtils.isEmpty(collection)) {
+          throw new Exception(
+              "For solr cloud property collection is mandatory");
+        }
+        logger.info("Using collection=" + collection);
+        solrClouldClient = new CloudSolrClient(zkHosts);
+        solrClouldClient.setDefaultCollection(collection);
+        solrClient = solrClouldClient;
+        if (splitMode.equalsIgnoreCase("none")) {
+          isComputeCurrentCollection = false;
+        } else {
+          isComputeCurrentCollection = true;
+        }
+      } else {
+        String[] solrUrls = StringUtils.split(solrUrl, ",");
+        if (solrUrls.length == 1) {
+          logger.info("Using SolrURL=" + solrUrl);
+          solrClient = new HttpSolrClient(solrUrl);
+        } else {
+          logger.info("Using load balance solr client. solrUrls="
+              + solrUrl);
+          logger.info("Initial URL for LB solr=" + solrUrls[0]);
+          @SuppressWarnings("resource")
+          LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(
+              solrUrls[0]);
+          for (int i = 1; i < solrUrls.length; i++) {
+            logger.info("Adding URL for LB solr=" + solrUrls[i]);
+            lbSolrClient.addSolrServer(solrUrls[i]);
+          }
+          solrClient = lbSolrClient;
+        }
+      }
+      try {
+        logger.info("Pinging Solr server. zkHosts=" + zkHosts
+            + ", urls=" + solrUrl);
+        SolrPingResponse response = solrClient.ping();
+        if (response.getStatus() == 0) {
+          logger.info("Ping to Solr server is successful for writer="
+              + count);
+        } else {
+          logger.warn("Ping to Solr server failed. It would check again. writer="
+              + count
+              + ", solrUrl="
+              + solrUrl
+              + ", zkHosts="
+              + zkHosts
+              + ", collection="
+              + collection
+              + ", response=" + response);
+        }
+      } catch (Throwable t) {
+        logger.warn(
+            "Ping to Solr server failed. It would check again. writer="
+                + count + ", solrUrl=" + solrUrl + ", zkHosts="
+                + zkHosts + ", collection=" + collection, t);
+      }
+
+      // Let's start the thread
+      SolrWorkerThread solrWriterThread = new SolrWorkerThread(solrClient);
+      solrWriterThread.setName(getNameForThread() + "," + collection
+          + ",writer=" + count);
+      solrWriterThread.setDaemon(true);
+      solrWriterThread.start();
+      writerThreadList.add(solrWriterThread);
+    }
+  }
+
+  @Override
+  public void setDrain(boolean drain) {
+    super.setDrain(drain);
+  }
+
+  /**
+   * Flush document buffer
+   */
+  public void flush() {
+    logger.info("Flush called...");
+    setDrain(true);
+
+    int wrapUpTimeSecs = 30;
+    // Give wrapUpTimeSecs seconds to wrap up
+    boolean isPending = false;
+    for (int i = 0; i < wrapUpTimeSecs; i++) {
+      for (SolrWorkerThread solrWorkerThread : writerThreadList) {
+        if (solrWorkerThread.isDone()) {
+          try {
+            solrWorkerThread.interrupt();
+          } catch (Throwable t) {
+            // ignore
+          }
+        } else {
+          isPending = true;
+        }
+      }
+      if (isPending) {
+        try {
+          logger.info("Will give " + (wrapUpTimeSecs - i)
+              + " seconds to wrap up");
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+      isPending = false;
+    }
+  }
+
+  @Override
+  public long getPendingCount() {
+    long totalCount = 0;
+    for (SolrWorkerThread solrWorkerThread : writerThreadList) {
+      totalCount += solrWorkerThread.localBuffer.size();
+    }
+    return totalCount;
+  }
+
+  @Override
+  public void close() {
+    logger.info("Closing Solr client...");
+    flush();
+
+    logger.info("Closed Solr client");
+    super.close();
+  }
+
+  @Override
+  public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
+      throws Exception {
+    try {
+      outgoingBuffer.put(new OutputData(jsonObj, inputMarker));
+    } catch (InterruptedException e) {
+      // ignore
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+   */
+  @Override
+  public String getShortDescription() {
+    return "output:destination=solr,collection=" + collection;
+  }
+
+  class SolrWorkerThread extends Thread {
+    /**
+     * 
+     */
+    SolrClient solrClient = null;
+    Collection<SolrInputDocument> localBuffer = new ArrayList<SolrInputDocument>();
+    long localBufferBytesSize = 0;
+    Map<String, InputMarker> latestInputMarkerList = new HashMap<String, InputMarker>();
+
+    /**
+     * 
+     */
+    public SolrWorkerThread(SolrClient solrClient) {
+      this.solrClient = solrClient;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+      logger.info("SolrWriter thread started");
+      long lastDispatchTime = System.currentTimeMillis();
+
+      //long totalWaitTimeMS = 0;
+      while (true) {
+        long currTimeMS = System.currentTimeMillis();
+        OutputData outputData = null;
+        try {
+          long nextDispatchDuration = maxIntervalMS
+              - (currTimeMS - lastDispatchTime);
+          outputData = outgoingBuffer.poll();
+          if (outputData == null && !isDrain()
+              && nextDispatchDuration > 0) {
+            outputData = outgoingBuffer.poll(nextDispatchDuration,
+                TimeUnit.MILLISECONDS);
+//            long diffTimeMS = System.currentTimeMillis()
+//                - currTimeMS;
+            // logger.info("Waited for " + diffTimeMS +
+            // " ms, planned for "
+            // + nextDispatchDuration + " ms, localBuffer.size="
+            // + localBuffer.size() + ", timedOut="
+            // + (outputData == null ? "true" : "false"));
+          }
+
+          if (isDrain() && outputData == null
+              && outgoingBuffer.size() == 0) {
+            break;
+          }
+          if (outputData != null) {
+            if (outputData.jsonObj.get("id") == null) {
+              outputData.jsonObj.put("id", UUID.randomUUID()
+                  .toString());
+            }
+            SolrInputDocument document = new SolrInputDocument();
+            for (String name : outputData.jsonObj.keySet()) {
+              Object obj = outputData.jsonObj.get(name);
+              document.addField(name, obj);
+              try {
+                localBufferBytesSize += obj.toString().length();
+              } catch (Throwable t) {
+                final String LOG_MESSAGE_KEY = this.getClass()
+                    .getSimpleName() + "_BYTE_COUNT_ERROR";
+                LogFeederUtil.logErrorMessageByInterval(
+                    LOG_MESSAGE_KEY,
+                    "Error calculating byte size. object="
+                        + obj, t, logger, Level.ERROR);
+
+              }
+            }
+            latestInputMarkerList.put(
+                outputData.inputMarker.base64FileKey,
+                outputData.inputMarker);
+            localBuffer.add(document);
+          }
+
+          if (localBuffer.size() > 0
+              && ((outputData == null && isDrain()) || (nextDispatchDuration <= 0 || localBuffer
+                  .size() >= maxBufferSize))) {
+            try {
+              if (isComputeCurrentCollection) {
+                // Compute the current router value
+
+                int weekDay = Calendar.getInstance().get(
+                    Calendar.DAY_OF_WEEK);
+                int currHour = Calendar.getInstance().get(
+                    Calendar.HOUR_OF_DAY);
+                int currMin = Calendar.getInstance().get(
+                    Calendar.MINUTE);
+
+                int minOfWeek = (weekDay - 1) * 24 * 60
+                    + currHour * 60 + currMin;
+                int slotByMin = minOfWeek / splitInterval
+                    % numberOfShards;
+
+                String shard = "shard" + slotByMin;
+
+                if (lastSlotByMin != slotByMin) {
+                  logger.info("Switching to shard " + shard
+                      + ", output="
+                      + getShortDescription());
+                  lastSlotByMin = slotByMin;
+                }
+
+                for (SolrInputDocument solrInputDocument : localBuffer) {
+                  solrInputDocument.addField(ROUTER_FIELD,
+                      shard);
+                }
+              }
+
+//              long beginTime = System.currentTimeMillis();
+              UpdateResponse response = solrClient
+                  .add(localBuffer);
+//              long endTime = System.currentTimeMillis();
+//              logger.info("Adding to Solr. Document count="
+//                  + localBuffer.size() + ". Took "
+//                  + (endTime - beginTime) + " ms");
+
+              if (response.getStatus() != 0) {
+                final String LOG_MESSAGE_KEY = this.getClass()
+                    .getSimpleName() + "_SOLR_UPDATE_ERROR";
+                LogFeederUtil
+                    .logErrorMessageByInterval(
+                        LOG_MESSAGE_KEY,
+                        "Error writing to Solr. response="
+                            + response.toString()
+                            + ", log="
+                            + (outputData == null ? null
+                                : outputData
+                                    .toString()),
+                        null, logger, Level.ERROR);
+              }
+              statMetric.count += localBuffer.size();
+              writeBytesMetric.count += localBufferBytesSize;
+              for (InputMarker inputMarker : latestInputMarkerList
+                  .values()) {
+                inputMarker.input.checkIn(inputMarker);
+              }
+
+              resetLocalBuffer();
+              lastDispatchTime = System.currentTimeMillis();
+            } catch (IOException ioException) {
+              // Transient error, lets block till it is available
+              while (!isDrain()) {
+                try {
+                  logger.warn("Solr is down. Going to sleep for "
+                      + RETRY_INTERVAL
+                      + " seconds. output="
+                      + getShortDescription());
+                  Thread.sleep(RETRY_INTERVAL * 1000);
+                } catch (Throwable t) {
+                  // ignore
+                  break;
+                }
+                if (isDrain()) {
+                  break;
+                }
+                try {
+                  SolrPingResponse pingResponse = solrClient
+                      .ping();
+                  if (pingResponse.getStatus() == 0) {
+                    logger.info("Solr seems to be up now. Resuming... output="
+                        + getShortDescription());
+                    break;
+                  }
+                } catch (Throwable t) {
+                  // Ignore
+                }
+              }
+            } catch (Throwable serverException) {
+              // Clear the buffer
+              resetLocalBuffer();
+              final String LOG_MESSAGE_KEY = this.getClass()
+                  .getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
+              LogFeederUtil.logErrorMessageByInterval(
+                  LOG_MESSAGE_KEY,
+                  "Error sending log message to server. "
+                      + (outputData == null ? null
+                          : outputData.toString()),
+                  serverException, logger, Level.ERROR);
+            }
+          }
+        } catch (InterruptedException e) {
+          // Handle thread exiting
+        } catch (Throwable t) {
+          final String LOG_MESSAGE_KEY = this.getClass()
+              .getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION";
+          LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+              "Caught exception in main loop. " + outputData, t,
+              logger, Level.ERROR);
+        }
+      }
+
+      if (solrClient != null) {
+        try {
+          solrClient.close();
+        } catch (IOException e) {
+          // Ignore
+        }
+      }
+
+      resetLocalBuffer();
+      logger.info("Exiting Solr writer thread. output="
+          + getShortDescription());
+    }
+
+    public boolean isDone() {
+      return localBuffer.size() == 0;
+    }
+
+    public void resetLocalBuffer() {
+      localBuffer.clear();
+      localBufferBytesSize = 0;
+      latestInputMarkerList.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
new file mode 100644
index 0000000..4265dc6
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+
+public class SolrUtil {
+
+  private static Logger logger = Logger.getLogger(SolrUtil.class);
+
+  private static SolrUtil instance = null;
+  SolrClient solrClient = null;
+  CloudSolrClient solrClouldClient = null;
+
+  boolean isSolrCloud = true;
+  String solrDetail = "";
+  String collectionName = null;
+
+  private SolrUtil() throws Exception {
+    String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
+    String zkHosts = LogFeederUtil.getStringProperty("logfeeder.solr.zkhosts");
+    String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.history", "history");
+    connectToSolr(url, zkHosts, collection);
+  }
+
+  public static SolrUtil getInstance() {
+    if (instance == null) {
+      synchronized (SolrUtil.class) {
+        if (instance == null) {
+          try {
+            instance = new SolrUtil();
+          } catch (Exception e) {
+            logger.error(e);
+          }
+        }
+      }
+    }
+    return instance;
+  }
+
+  public SolrClient connectToSolr(String url, String zkHosts,
+                                  String collection) throws Exception {
+    this.collectionName = collection;
+    solrDetail = "zkHosts=" + zkHosts + ", collection=" + collection
+      + ", url=" + url;
+
+    logger.info("connectToSolr() " + solrDetail);
+    if (collection == null || collection.isEmpty()) {
+      throw new Exception("For solr, collection name is mandatory. "
+        + solrDetail);
+    }
+    if (zkHosts != null && !zkHosts.isEmpty()) {
+      solrDetail = "zkHosts=" + zkHosts + ", collection=" + collection;
+      logger.info("Using zookeepr. " + solrDetail);
+      solrClouldClient = new CloudSolrClient(zkHosts);
+      solrClouldClient.setDefaultCollection(collection);
+      solrClient = solrClouldClient;
+      int waitDurationMS = 3 * 60 * 1000;
+      checkSolrStatus(waitDurationMS);
+    } else {
+      if (url == null || url.trim().isEmpty()) {
+        throw new Exception("Both zkHosts and URL are empty. zkHosts="
+          + zkHosts + ", collection=" + collection + ", url="
+          + url);
+      }
+      solrDetail = "collection=" + collection + ", url=" + url;
+      String collectionURL = url + "/" + collection;
+      logger.info("Connecting to  solr : " + collectionURL);
+      solrClient = new HttpSolrClient(collectionURL);
+
+    }
+    return solrClient;
+  }
+
+  /**
+   * @param waitDurationMS
+   * @return
+   */
+  public boolean checkSolrStatus(int waitDurationMS) {
+    boolean status = false;
+    try {
+      long beginTimeMS = System.currentTimeMillis();
+      long waitIntervalMS = 2000;
+      int pingCount = 0;
+      while (true) {
+        pingCount++;
+        CollectionAdminResponse response = null;
+        try {
+          CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List();
+          response = colListReq.process(solrClient);
+        } catch (Exception ex) {
+          logger.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
+        }
+        if (response != null && response.getStatus() == 0) {
+          logger.info("Solr getCollections() is success. solr=" + solrDetail);
+          status = true;
+          break;
+        }
+        if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
+          logger.error("Solr is not reachable even after "
+            + (System.currentTimeMillis() - beginTimeMS)
+            + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr="
+            + solrDetail + ", response=" + response);
+          break;
+        } else {
+          logger.warn("Solr is not not reachable yet. getCollections() attempt count=" + pingCount
+            + ". Will sleep for " + waitIntervalMS + " ms and try again." + " solr=" + solrDetail
+            + ", response=" + response);
+
+        }
+        Thread.sleep(waitIntervalMS);
+      }
+    } catch (Throwable t) {
+      logger.error("Seems Solr is not up. solrDetail=" + solrDetail);
+    }
+    return status;
+  }
+
+  /**
+   * @param solrQuery
+   * @return
+   * @throws SolrServerException
+   * @throws IOException
+   * @throws SolrException
+   */
+  public QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException {
+    if (solrClient != null) {
+      QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
+      return queryResponse;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * @return
+   */
+  public HashMap<String, Object> getConfigDoc() {
+    HashMap<String, Object> configMap = new HashMap<String, Object>();
+    SolrQuery solrQuery = new SolrQuery();
+    solrQuery.setQuery("*:*");
+    String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.NAME;
+    solrQuery.setFilterQueries(fq);
+    try {
+      QueryResponse response = SolrUtil.getInstance().process(solrQuery);
+      SolrDocumentList documentList = response.getResults();
+      if (documentList != null && documentList.size() > 0) {
+        SolrDocument configDoc = documentList.get(0);
+        String configJson = LogFeederUtil.getGson().toJson(configDoc);
+        configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson);
+      }
+    } catch (SolrException | SolrServerException | IOException e) {
+      logger.error(e);
+    }
+    return configMap;
+  }
+
+  /**
+   * @param solrInputDocument
+   * @throws SolrServerException
+   * @throws IOException
+   */
+  public void addDoc(SolrInputDocument solrInputDocument) throws SolrServerException, IOException {
+    solrClient.add(solrInputDocument);
+    solrClient.commit();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
new file mode 100644
index 0000000..f030040
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.view;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class VLogfeederFilter {
+
+  private String label;
+  private List<String> hosts;
+  private List<String> defaultLevels;
+  private List<String> overrideLevels;
+  private String expiryTime;
+
+  public VLogfeederFilter() {
+    hosts = new ArrayList<String>();
+    defaultLevels = new ArrayList<String>();
+    overrideLevels = new ArrayList<String>();
+  }
+
+  public String getLabel() {
+    return label;
+  }
+
+  public void setLabel(String label) {
+    this.label = label;
+  }
+
+  public List<String> getHosts() {
+    return hosts;
+  }
+
+  public void setHosts(List<String> hosts) {
+    this.hosts = hosts;
+  }
+
+  public List<String> getDefaultLevels() {
+    return defaultLevels;
+  }
+
+  public void setDefaultLevels(List<String> defaultLevels) {
+    this.defaultLevels = defaultLevels;
+  }
+
+  public List<String> getOverrideLevels() {
+    return overrideLevels;
+  }
+
+  public void setOverrideLevels(List<String> overrideLevels) {
+    this.overrideLevels = overrideLevels;
+  }
+
+  public String getExpiryTime() {
+    return expiryTime;
+  }
+
+  public void setExpiryTime(String expiryTime) {
+    this.expiryTime = expiryTime;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
new file mode 100644
index 0000000..4ddef3f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.view;
+
+import java.util.HashMap;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class VLogfeederFilterWrapper {
+
+  private HashMap<String, VLogfeederFilter> filter;
+  private String id;
+
+  public HashMap<String, VLogfeederFilter> getFilter() {
+    return filter;
+  }
+
+  public void setFilter(HashMap<String, VLogfeederFilter> filter) {
+    this.filter = filter;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
new file mode 100644
index 0000000..956af16
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public abstract class AbstractTimelineMetricsSink {
+  public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
+  public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
+  public static final String METRICS_SEND_INTERVAL = "sendInterval";
+  public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout";
+  public static final String COLLECTOR_HOST_PROPERTY = "collector";
+  public static final String COLLECTOR_PORT_PROPERTY = "port";
+  public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
+
+  protected final Log LOG;
+
+  protected static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+        .setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+  }
+
+  public AbstractTimelineMetricsSink() {
+    LOG = LogFactory.getLog(this.getClass());
+  }
+
+  protected void emitMetrics(TimelineMetrics metrics) {
+    String connectUrl = getCollectorUri();
+    int timeout = getTimeoutSeconds() * 1000;
+    try {
+      String jsonData = mapper.writeValueAsString(metrics);
+      LOG.info("Posting JSON=" + jsonData);
+      
+      HttpURLConnection connection =
+        (HttpURLConnection) new URL(connectUrl).openConnection();
+
+      connection.setRequestMethod("POST");
+      connection.setRequestProperty("Content-Type", "application/json");
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+      connection.setDoOutput(true);
+
+      if (jsonData != null) {
+        try (OutputStream os = connection.getOutputStream()) {
+          os.write(jsonData.getBytes("UTF-8"));
+        }
+      }
+
+      int statusCode = connection.getResponseCode();
+
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
+          "statusCode = " + statusCode);
+      } else {
+        LOG.debug("Metrics posted to Collector " + connectUrl);
+      }
+    } catch (IOException e) {
+      throw new UnableToConnectException(e).setConnectUrl(connectUrl);
+    }
+  }
+
+  abstract protected String getCollectorUri();
+
+  abstract protected int getTimeoutSeconds();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
new file mode 100644
index 0000000..31044cc
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+/**
+ * Is used to determine metrics aggregate table.
+ *
+ * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric
+ */
+public enum Precision {
+  SECONDS,
+  MINUTES,
+  HOURS,
+  DAYS;
+
+  public static class PrecisionFormatException extends IllegalArgumentException {
+    public PrecisionFormatException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  public static Precision getPrecision(String precision) throws PrecisionFormatException {
+    if (precision == null ) {
+      return null;
+    }
+    try {
+      return Precision.valueOf(precision.toUpperCase());
+    } catch (IllegalArgumentException e) {
+      throw new PrecisionFormatException("precision should be seconds, " +
+        "minutes, hours or days", e);
+    }
+  }
+
+  public static Precision getPrecision(long startTime, long endTime) {
+    long HOUR = 3600000; // 1 hour
+    long DAY = 86400000; // 1 day
+    long timeRange = endTime - startTime;
+    if (timeRange > 30 * DAY) {
+      return Precision.DAYS;
+    } else if (timeRange > 1 * DAY) {
+      return Precision.HOURS;
+    } else if (timeRange > 2 * HOUR) {
+      return Precision.MINUTES;
+    } else {
+      return Precision.SECONDS;
+    }
+  }
+
+  public static Precision getHigherPrecision(Precision precision) {
+
+    if (precision == null)
+      return null;
+
+    if (precision.equals(Precision.SECONDS)) {
+      return Precision.MINUTES;
+    } else if (precision.equals(Precision.MINUTES)) {
+      return Precision.HOURS;
+    } else if (precision.equals(Precision.HOURS)) {
+      return Precision.DAYS;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java
new file mode 100644
index 0000000..962a071
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/PrecisionLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+public class PrecisionLimitExceededException extends IllegalArgumentException {
+
+  private static final long serialVersionUID = 1L;
+
+  public PrecisionLimitExceededException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public PrecisionLimitExceededException(String message) {
+    super(message);
+  }
+
+  public PrecisionLimitExceededException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
new file mode 100644
index 0000000..8ecca54
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+/**
+ * This class prevents creating a TreeMap for every instantiation of a metric
+ * read from the store. The methods are meant to provide interoperability
+ * with @TimelineMetric
+ */
+public class SingleValuedTimelineMetric {
+  private Long timestamp;
+  private Double value;
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostName;
+  private Long startTime;
+  private String type;
+
+  public void setSingleTimeseriesValue(Long timestamp, Double value) {
+    this.timestamp = timestamp;
+    this.value = value;
+  }
+
+  public SingleValuedTimelineMetric(String metricName, String appId,
+                                    String instanceId, String hostName,
+                                    long timestamp, long startTime, String type) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.hostName = hostName;
+    this.timestamp = timestamp;
+    this.startTime = startTime;
+    this.type = type;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public Double getValue() {
+    return value;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public String getHostName() {
+    return hostName;
+  }
+
+  public boolean equalsExceptTime(TimelineMetric metric) {
+    if (!metricName.equals(metric.getMetricName())) return false;
+    if (hostName != null ? !hostName.equals(metric.getHostName()) : metric.getHostName() != null)
+      return false;
+    if (appId != null ? !appId.equals(metric.getAppId()) : metric.getAppId() != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(metric.getInstanceId()) : metric.getInstanceId() != null) return false;
+
+    return true;
+  }
+
+  public TimelineMetric getTimelineMetric() {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName(this.metricName);
+    metric.setAppId(this.appId);
+    metric.setHostName(this.hostName);
+    metric.setType(this.type);
+    metric.setInstanceId(this.instanceId);
+    metric.setStartTime(this.startTime);
+    metric.setTimestamp(this.timestamp);
+    metric.getMetricValues().put(timestamp, value);
+    return metric;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
new file mode 100644
index 0000000..0e74f2d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.map.annotate.JsonDeserialize;
+
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetric implements Comparable<TimelineMetric> {
+
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostName;
+  private long timestamp;
+  private long startTime;
+  private String type;
+  private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+  // default
+  public TimelineMetric() {
+
+  }
+
+  // copy constructor
+  public TimelineMetric(TimelineMetric metric) {
+    setMetricName(metric.getMetricName());
+    setType(metric.getType());
+    setTimestamp(metric.getTimestamp());
+    setAppId(metric.getAppId());
+    setInstanceId(metric.getInstanceId());
+    setHostName(metric.getHostName());
+    setStartTime(metric.getStartTime());
+    setMetricValues(new TreeMap<Long, Double>(metric.getMetricValues()));
+  }
+
+  @XmlElement(name = "metricname")
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricName = metricName;
+  }
+
+  @XmlElement(name = "appid")
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  @XmlElement(name = "instanceid")
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  @XmlElement(name = "hostname")
+  public String getHostName() {
+    return hostName;
+  }
+
+  public void setHostName(String hostName) {
+    this.hostName = hostName;
+  }
+
+  @XmlElement(name = "timestamp")
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @XmlElement(name = "starttime")
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @XmlElement(name = "type")
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  @XmlElement(name = "metrics")
+  public TreeMap<Long, Double> getMetricValues() {
+    return metricValues;
+  }
+
+  public void setMetricValues(TreeMap<Long, Double> metricValues) {
+    this.metricValues = metricValues;
+  }
+
+  public void addMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues.putAll(metricValues);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    TimelineMetric metric = (TimelineMetric) o;
+
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+    if (timestamp != metric.timestamp) return false;
+    if (startTime != metric.startTime) return false;
+
+    return true;
+  }
+
+  public boolean equalsExceptTime(TimelineMetric metric) {
+    if (!metricName.equals(metric.metricName)) return false;
+    if (hostName != null ? !hostName.equals(metric.hostName) : metric.hostName != null)
+      return false;
+    if (appId != null ? !appId.equals(metric.appId) : metric.appId != null)
+      return false;
+    if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = metricName.hashCode();
+    result = 31 * result + (appId != null ? appId.hashCode() : 0);
+    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+    result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
+    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+    return result;
+  }
+
+  @Override
+  public int compareTo(TimelineMetric other) {
+    if (timestamp > other.timestamp) {
+      return -1;
+    } else if (timestamp < other.timestamp) {
+      return 1;
+    } else {
+      return metricName.compareTo(other.metricName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
new file mode 100644
index 0000000..11ca665
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * The class that hosts a list of timeline entities.
+ */
+@XmlRootElement(name = "metrics")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetrics {
+
+  private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
+
+  public TimelineMetrics() {}
+
+  @XmlElement(name = "metrics")
+  public List<TimelineMetric> getMetrics() {
+    return allMetrics;
+  }
+
+  public void setMetrics(List<TimelineMetric> allMetrics) {
+    this.allMetrics = allMetrics;
+  }
+
+  private boolean isEqualTimelineMetrics(TimelineMetric metric1,
+                                         TimelineMetric metric2) {
+
+    boolean isEqual = true;
+
+    if (!metric1.getMetricName().equals(metric2.getMetricName())) {
+      return false;
+    }
+
+    if (metric1.getHostName() != null) {
+      isEqual = metric1.getHostName().equals(metric2.getHostName());
+    }
+
+    if (metric1.getAppId() != null) {
+      isEqual = metric1.getAppId().equals(metric2.getAppId());
+    }
+
+    return isEqual;
+  }
+
+  /**
+   * Merge with existing TimelineMetric if everything except startTime is
+   * the same.
+   * @param metric {@link TimelineMetric}
+   */
+  public void addOrMergeTimelineMetric(TimelineMetric metric) {
+    TimelineMetric metricToMerge = null;
+
+    if (!allMetrics.isEmpty()) {
+      for (TimelineMetric timelineMetric : allMetrics) {
+        if (timelineMetric.equalsExceptTime(metric)) {
+          metricToMerge = timelineMetric;
+          break;
+        }
+      }
+    }
+
+    if (metricToMerge != null) {
+      metricToMerge.addMetricValues(metric.getMetricValues());
+      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+        metricToMerge.setTimestamp(metric.getTimestamp());
+      }
+      if (metricToMerge.getStartTime() > metric.getStartTime()) {
+        metricToMerge.setStartTime(metric.getStartTime());
+      }
+    } else {
+      allMetrics.add(metric);
+    }
+  }
+
+  // Optimization that addresses too many TreeMaps from getting created.
+  public void addOrMergeTimelineMetric(SingleValuedTimelineMetric metric) {
+    TimelineMetric metricToMerge = null;
+
+    if (!allMetrics.isEmpty()) {
+      for (TimelineMetric timelineMetric : allMetrics) {
+        if (metric.equalsExceptTime(timelineMetric)) {
+          metricToMerge = timelineMetric;
+          break;
+        }
+      }
+    }
+
+    if (metricToMerge != null) {
+      metricToMerge.getMetricValues().put(metric.getTimestamp(), metric.getValue());
+      if (metricToMerge.getTimestamp() > metric.getTimestamp()) {
+        metricToMerge.setTimestamp(metric.getTimestamp());
+      }
+      if (metricToMerge.getStartTime() > metric.getStartTime()) {
+        metricToMerge.setStartTime(metric.getStartTime());
+      }
+    } else {
+      allMetrics.add(metric.getTimelineMetric());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
new file mode 100644
index 0000000..797924f
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/UnableToConnectException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+public class UnableToConnectException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  private String connectUrl;
+
+  public UnableToConnectException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public UnableToConnectException(String message) {
+    super(message);
+  }
+
+  public UnableToConnectException(Throwable cause) {
+    super(cause);
+  }
+
+  public UnableToConnectException setConnectUrl(String connectUrl) {
+    this.connectUrl = connectUrl;
+    return this;
+  }
+
+  public String getConnectUrl() {
+    return connectUrl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
new file mode 100644
index 0000000..a331c77
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.cache;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TimelineMetricsCache {
+
+  private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder();
+  private static final Log LOG = LogFactory.getLog(TimelineMetric.class);
+  public static final int MAX_RECS_PER_NAME_DEFAULT = 10000;
+  public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min
+  private final int maxRecsPerName;
+  private final int maxEvictionTimeInMillis;
+  private final Map<String, Double> counterMetricLastValue = new HashMap<String, Double>();
+
+  public TimelineMetricsCache(int maxRecsPerName, int maxEvictionTimeInMillis) {
+    this.maxRecsPerName = maxRecsPerName;
+    this.maxEvictionTimeInMillis = maxEvictionTimeInMillis;
+  }
+
+  class TimelineMetricWrapper {
+    private long timeDiff = -1;
+    private long oldestTimestamp = -1;
+    private TimelineMetric timelineMetric;
+
+    TimelineMetricWrapper(TimelineMetric timelineMetric) {
+      this.timelineMetric = timelineMetric;
+      this.oldestTimestamp = timelineMetric.getStartTime();
+    }
+
+    private void updateTimeDiff(long timestamp) {
+      if (oldestTimestamp != -1 && timestamp > oldestTimestamp) {
+        timeDiff = timestamp - oldestTimestamp;
+      } else {
+        oldestTimestamp = timestamp;
+      }
+    }
+
+    public void putMetric(TimelineMetric metric) {
+      this.timelineMetric.addMetricValues(metric.getMetricValues());
+      updateTimeDiff(metric.getStartTime());
+    }
+
+    public long getTimeDiff() {
+      return timeDiff;
+    }
+
+    public TimelineMetric getTimelineMetric() {
+      return timelineMetric;
+    }
+  }
+
+  // TODO: Change to ConcurentHashMap with weighted eviction
+  class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {//
+    private static final long serialVersionUID = 1L;
+    private boolean gotOverflow = false;
+    // To avoid duplication at the end of the buffer and beginning of the next
+    // segment of values
+    private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>();
+
+    @Override
+    protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) {
+      boolean overflow = size() > maxRecsPerName;
+      if (overflow && !gotOverflow) {
+        LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest);
+        gotOverflow = true;
+      }
+      return overflow;
+    }
+
+    public TimelineMetric evict(String metricName) {
+      TimelineMetricWrapper metricWrapper = this.get(metricName);
+
+      if (metricWrapper == null
+        || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) {
+        return null;
+      }
+
+      TimelineMetric timelineMetric = metricWrapper.getTimelineMetric();
+      this.remove(metricName);
+
+      return timelineMetric;
+    }
+
+    public void put(String metricName, TimelineMetric timelineMetric) {
+      if (isDuplicate(timelineMetric)) {
+        return;
+      }
+      TimelineMetricWrapper metric = this.get(metricName);
+      if (metric == null) {
+        this.put(metricName, new TimelineMetricWrapper(timelineMetric));
+      } else {
+        metric.putMetric(timelineMetric);
+      }
+      // Buffer last ts value
+      endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime());
+    }
+
+    /**
+     * Test whether last buffered timestamp is same as the newly received.
+     * @param timelineMetric @TimelineMetric
+     * @return true/false
+     */
+    private boolean isDuplicate(TimelineMetric timelineMetric) {
+      return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName())
+        && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime());
+    }
+  }
+
+  public TimelineMetric getTimelineMetric(String metricName) {
+    if (timelineMetricCache.containsKey(metricName)) {
+      return timelineMetricCache.evict(metricName);
+    }
+
+    return null;
+  }
+
+  /**
+   * Getter method to help testing eviction
+   * @return @int
+   */
+  public int getMaxEvictionTimeInMillis() {
+    return maxEvictionTimeInMillis;
+  }
+
+  public void putTimelineMetric(TimelineMetric timelineMetric) {
+    timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric);
+  }
+
+  private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) {
+    String metricName = timelineMetric.getMetricName();
+    double firstValue = timelineMetric.getMetricValues().size() > 0
+        ? timelineMetric.getMetricValues().entrySet().iterator().next().getValue() : 0;
+    Double value = counterMetricLastValue.get(metricName);
+    double previousValue = value != null ? value : firstValue;
+    Map<Long, Double> metricValues = timelineMetric.getMetricValues();
+    TreeMap<Long, Double>   newMetricValues = new TreeMap<Long, Double>();
+    for (Map.Entry<Long, Double> entry : metricValues.entrySet()) {
+      newMetricValues.put(entry.getKey(), entry.getValue() - previousValue);
+      previousValue = entry.getValue();
+    }
+    timelineMetric.setMetricValues(newMetricValues);
+    counterMetricLastValue.put(metricName, previousValue);
+  }
+
+  public void putTimelineMetric(TimelineMetric timelineMetric, boolean isCounter) {
+    if (isCounter) {
+      transformMetricValuesToDerivative(timelineMetric);
+    }
+    putTimelineMetric(timelineMetric);
+  }
+}


Mime
View raw message