ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oleew...@apache.org
Subject [5/6] ambari git commit: AMBARI-20378. Logfeeder: add de-duplication support (oleewere)
Date Wed, 12 Apr 2017 12:32:06 GMT
AMBARI-20378. Logfeeder: add de-duplication support (oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9bc97c4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9bc97c4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9bc97c4b

Branch: refs/heads/branch-2.5
Commit: 9bc97c4b998a51db4d2ed9d986fe318194f8a964
Parents: a65c5a0
Author: oleewere <oleewere@gmail.com>
Authored: Fri Mar 10 14:45:13 2017 +0100
Committer: oleewere <oleewere@gmail.com>
Committed: Wed Apr 12 14:31:27 2017 +0200

----------------------------------------------------------------------
 .../logfeeder/common/LogFeederConstants.java    |   2 +
 .../ambari/logfeeder/filter/FilterJSON.java     |   2 +
 .../apache/ambari/logfeeder/input/Input.java    |  63 ++++++-
 .../ambari/logfeeder/input/cache/LRUCache.java  |  99 +++++++++++
 .../ambari/logfeeder/mapper/MapperDate.java     |   4 +
 .../logfeeder/output/OutputLineFilter.java      |  65 ++++++++
 .../ambari/logfeeder/output/OutputManager.java  |   8 +-
 .../ambari/logfeeder/filter/FilterJSONTest.java |   3 +
 .../logfeeder/input/cache/LRUCacheTest.java     | 123 ++++++++++++++
 .../ambari/logfeeder/mapper/MapperDateTest.java |   3 +
 .../logfeeder/output/OutputLineFilterTest.java  | 167 +++++++++++++++++++
 .../logfeeder/output/OutputManagerTest.java     |   3 +-
 .../test-config/logfeeder/logfeeder.properties  |   5 +
 .../shipper-conf/input.config-zookeeper.json    |   5 +-
 .../configuration/logfeeder-properties.xml      |  56 +++++++
 15 files changed, 603 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index d1e7fba..a7559aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -36,4 +36,6 @@ public class LogFeederConstants {
   // S3 Constants
   public static final String S3_PATH_START_WITH = "s3://";
   public static final String S3_PATH_SEPARATOR = "/";
+
+  public static final String IN_MEMORY_TIMESTAMP = "in_memory_timestamp";
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index ba63c61..35f692e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.filter;
 
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.util.DateUtil;
@@ -48,6 +49,7 @@ public class FilterJSON extends Filter {
     if (timeStampStr != null && !timeStampStr.isEmpty()) {
       String logtime = DateUtil.getDate(timeStampStr);
       jsonMap.put("logtime", logtime);
+      jsonMap.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, Long.parseLong(timeStampStr));
     }
     super.apply(jsonMap, inputMarker);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index e13d9bd..9f54d8a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -25,12 +25,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
 import org.apache.ambari.logfeeder.common.ConfigBlock;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.filter.Filter;
 import org.apache.ambari.logfeeder.metrics.MetricData;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.log4j.Logger;
 
 public abstract class Input extends ConfigBlock implements Runnable {
@@ -39,7 +41,18 @@ public abstract class Input extends ConfigBlock implements Runnable {
   private static final boolean DEFAULT_TAIL = true;
   private static final boolean DEFAULT_USE_EVENT_MD5 = false;
   private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
-  
+  private static final boolean DEFAULT_CACHE_ENABLED = false;
+  private static final boolean DEFAULT_CACHE_DEDUP_LAST = false;
+  private static final int DEFAULT_CACHE_SIZE = 100;
+  private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
+  private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
+
+  private static final String CACHE_ENABLED = "cache_enabled";
+  private static final String CACHE_KEY_FIELD = "cache_key_field";
+  private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
+  private static final String CACHE_SIZE = "cache_size";
+  private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
+
   protected InputManager inputManager;
   protected OutputManager outputManager;
   private List<Output> outputList = new ArrayList<Output>();
@@ -54,6 +67,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
   private boolean useEventMD5;
   private boolean genEventMD5;
 
+  private LRUCache cache;
+  private String cacheKeyField;
+
   protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
   protected String getReadBytesMetricName() {
     return null;
@@ -107,6 +123,7 @@ public abstract class Input extends ConfigBlock implements Runnable {
   @Override
   public void init() throws Exception {
     super.init();
+    initCache();
     tail = getBooleanValue("tail", DEFAULT_TAIL);
     useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
     genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
@@ -114,6 +131,7 @@ public abstract class Input extends ConfigBlock implements Runnable {
     if (firstFilter != null) {
       firstFilter.init();
     }
+
   }
 
   boolean monitor() {
@@ -217,6 +235,33 @@ public abstract class Input extends ConfigBlock implements Runnable {
     isClosed = true;
   }
 
+  private void initCache() {
+    boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
+      ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
+      : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
+    if (cacheEnabled) {
+      String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
+        ? getStringValue(CACHE_KEY_FIELD)
+        : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
+
+      setCacheKeyField(getStringValue(cacheKeyField));
+
+      boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
+        ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
+        : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
+
+      int cacheSize = getConfigValue(CACHE_SIZE) != null
+        ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
+        : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
+
+      long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
+        ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
+        : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval",
String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
+
+      setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
+    }
+  }
+
   public boolean isTail() {
     return tail;
   }
@@ -257,6 +302,22 @@ public abstract class Input extends ConfigBlock implements Runnable {
     return thread;
   }
 
+  public LRUCache getCache() {
+    return cache;
+  }
+
+  public void setCache(LRUCache cache) {
+    this.cache = cache;
+  }
+
+  public String getCacheKeyField() {
+    return cacheKeyField;
+  }
+
+  public void setCacheKeyField(String cacheKeyField) {
+    this.cacheKeyField = cacheKeyField;
+  }
+
   @Override
   public String getNameForThread() {
     if (filePath != null) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
new file mode 100644
index 0000000..d9cfef8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.cache;
+
+import com.google.common.collect.EvictingQueue;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * LRU cache for handle de-duplications per input files.
+ * It won't put already existing entries into the cache map if de-duplication interval not
higher then a specific value
+ * or if the new value is the most recently used one (in case of lastDedupEnabled is true)
+ */
+public class LRUCache {
+  private final LinkedHashMap<String, Long> keyValueMap;
+  private final String fileName;
+  private final long dedupInterval;
+  private final boolean lastDedupEnabled;
+  private final EvictingQueue<String> mostRecentLogs;
+
+  public LRUCache(final int limit, final String fileName, final long dedupInterval, boolean
lastDedupEnabled) {
+    this.fileName = fileName;
+    this.dedupInterval = dedupInterval;
+    this.lastDedupEnabled = lastDedupEnabled;
+    this.mostRecentLogs = EvictingQueue.create(1); // for now, we will just store 1 mru entry
+    keyValueMap = new LinkedHashMap<String, Long>(16, 0.75f, true) {
+      @Override
+      protected boolean removeEldestEntry(final Map.Entry<String, Long> eldest) {
+        return size() > limit;
+      }
+    };
+  }
+
+  public boolean isEntryReplaceable(String key, Long value) {
+    boolean result = true;
+    Long existingValue = keyValueMap.get(key);
+    if (existingValue == null) {
+      result = true;
+    } else if (lastDedupEnabled && mostRecentLogs.contains(key)) { // TODO: get peek
element if mostRecentLogs will contain more than 1 element
+      result = false;
+    } else if (Math.abs(value - existingValue) < dedupInterval) {
+      result = false;
+    }
+    mostRecentLogs.add(key);
+    return result;
+  }
+
+  public void put(String key, Long value) {
+    if (isEntryReplaceable(key, value)) {
+      keyValueMap.put(key, value);
+    }
+  }
+
+  public Long get(String key) {
+    mostRecentLogs.add(key);
+    return keyValueMap.get(key);
+  }
+
+  public String getMRUKey() {
+    return mostRecentLogs.peek();
+  }
+
+  public int size() {
+    return keyValueMap.size();
+  }
+
+  public long getDedupInterval() {
+    return dedupInterval;
+  }
+
+  public boolean containsKey(String key) {
+    return keyValueMap.containsKey(key);
+  }
+
+  public String getFileName() {
+    return this.fileName;
+  }
+
+  public boolean isLastDedupEnabled() {
+    return lastDedupEnabled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index eb3ae01..6a7fad7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -80,6 +81,7 @@ public class MapperDate extends Mapper {
         if (isEpoch) {
           long ms = Long.parseLong(value.toString()) * 1000;
           value = new Date(ms);
+          jsonObj.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, ((Date) value).getTime());
         } else if (targetDateFormatter != null) {
           if (srcDateFormatter != null) {
             Date srcDate = srcDateFormatter.parse(value.toString());
@@ -97,8 +99,10 @@ public class MapperDate extends Mapper {
               }
             }
             value = targetDateFormatter.format(srcDate);
+            jsonObj.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, srcDate.getTime());
           } else {
             value = targetDateFormatter.parse(value.toString());
+            jsonObj.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, ((Date) value).getTime());
           }
         } else {
           return value;

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
new file mode 100644
index 0000000..fcf2695
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * Filter for outputs based on input configs, which can drop lines if the filter applies.
+ */
+public class OutputLineFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OutputLineFilter.class);
+
+  /**
+   * Applies filter based on input cache (on service log only).
+   * Get the message and in-memory timestamp for log line. If both are not empty, evaluate
that log line needs to be filtered out or not.
+   */
+  public Boolean apply(Map<String, Object> lineMap, Input input) {
+    boolean isLogFilteredOut = false;
+    LRUCache inputLruCache = input.getCache();
+    if (inputLruCache != null && "service".equals(input.getConfigs().get(LogFeederConstants.ROW_TYPE)))
{
+      String logMessage = (String) lineMap.get(input.getCacheKeyField());
+      Long timestamp = null;
+      if (lineMap.containsKey((LogFeederConstants.IN_MEMORY_TIMESTAMP))) {
+        timestamp = (Long) lineMap.get(LogFeederConstants.IN_MEMORY_TIMESTAMP);
+      }
+      if (logMessage != null && timestamp != null) {
+        isLogFilteredOut = !inputLruCache.isEntryReplaceable(logMessage, timestamp);
+        if (!isLogFilteredOut) {
+          inputLruCache.put(logMessage, timestamp);
+        } else {
+          LOG.debug("Log line filtered out: {} (file: {}, dedupInterval: {}, lastDedupEnabled:
{})",
+            logMessage, inputLruCache.getFileName(), inputLruCache.getDedupInterval(), inputLruCache.isLastDedupEnabled());
+        }
+      }
+    }
+    if (lineMap.containsKey(LogFeederConstants.IN_MEMORY_TIMESTAMP)) {
+      lineMap.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP);
+    }
+    return isLogFilteredOut;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index 86b5c57..3c80e50 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -30,8 +30,10 @@ import java.util.UUID;
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
 import org.apache.ambari.logfeeder.logconfig.FilterLogData;
 import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.DateUtil;
 import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.ambari.logfeeder.util.MurmurHash;
 import org.apache.commons.lang3.StringUtils;
@@ -51,6 +53,8 @@ public class OutputManager {
   private static long docCounter = 0;
   private MetricData messageTruncateMetric = new MetricData(null, false);
 
+  private OutputLineFilter outputLineFilter = new OutputLineFilter();
+
   public List<Output> getOutputs() {
     return outputs;
   }
@@ -138,8 +142,8 @@ public class OutputManager {
         jsonObj.put("message_md5", "" + MurmurHash.hash64A(logMessage.getBytes(), 31174077));
       }
     }
-    
-    if (FilterLogData.INSTANCE.isAllowed(jsonObj, inputMarker)) {
+    if (FilterLogData.INSTANCE.isAllowed(jsonObj, inputMarker)
+      && !outputLineFilter.apply(jsonObj, inputMarker.input)) {
       for (Output output : input.getOutputList()) {
         try {
           output.write(jsonObj, inputMarker);

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index 06d8db2..643dafc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.ambari.logfeeder.common.LogfeederException;
 import org.apache.ambari.logfeeder.input.InputMarker;
 import org.apache.ambari.logfeeder.output.OutputManager;
@@ -76,6 +77,7 @@ public class FilterJSONTest {
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+    assertEquals("Incorrect decoding: in memory timestamp", d.getTime(), jsonParams.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
     assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
     assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
   }
@@ -100,6 +102,7 @@ public class FilterJSONTest {
     Map<String, Object> jsonParams = capture.getValue();
 
     assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+    assertEquals("Incorrect decoding: in memory timestamp", d.getTime(), jsonParams.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
     assertEquals("Incorrect decoding: some field", "abc", jsonParams.remove("some_field"));
     assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
new file mode 100644
index 0000000..dd97d27
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.cache;
+
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class LRUCacheTest {
+
+  private LRUCache underTest;
+
+  @Before
+  public void setUp() {
+    underTest = new LRUCache(4, "/mypath", Long.parseLong("1000"), true);
+  }
+
+  @Test
+  public void testLruCachePut() {
+    // GIVEN
+    // WHEN
+    underTest.put("mymessage1", 1000L);
+    underTest.put("mymessage2", 1000L);
+    underTest.put("mymessage3", 1000L);
+    underTest.put("mymessage4", 1000L);
+    underTest.put("mymessage5", 1000L);
+    underTest.put("mymessage1", 1500L);
+    underTest.put("mymessage1", 3500L);
+    underTest.put("mymessage5", 1700L);
+    // THEN
+    assertEquals((Long) 1500L, underTest.get("mymessage1"));
+    assertEquals((Long) 1000L, underTest.get("mymessage5"));
+    assertEquals(underTest.getMRUKey(), "mymessage5");
+    assertEquals(4, underTest.size());
+    assertFalse(underTest.containsKey("mymessage2"));
+  }
+
+  @Test
+  public void testLruCacheFilterMruKeys() {
+    // GIVEN
+    // WHEN
+    underTest.put("mymessage1", 1000L);
+    underTest.put("mymessage1", 3000L);
+    underTest.put("mymessage1", 5000L);
+    underTest.put("mymessage1", 7000L);
+    // THEN
+    assertEquals((Long) 1000L, underTest.get("mymessage1"));
+  }
+
+  @Test
+  public void testLruCacheDoNotFilterMruKeysIfLastDedupDisabled() {
+    // GIVEN
+    underTest = new LRUCache(4, "/mypath", 1000, false);
+    // WHEN
+    underTest.put("mymessage1", 1000L);
+    underTest.put("mymessage1", 3000L);
+    // THEN
+    assertEquals((Long) 3000L, underTest.get("mymessage1"));
+  }
+
+  @Test
+  public void testLruCacheFilterByDedupInterval() {
+    // GIVEN
+    // WHEN
+    underTest.put("mymessage1", 1000L);
+    underTest.put("mymessage2", 1000L);
+    underTest.put("mymessage1", 1250L);
+    underTest.put("mymessage2", 1500L);
+    underTest.put("mymessage1", 1500L);
+    underTest.put("mymessage2", 2100L);
+    // THEN
+    assertEquals((Long) 1000L, underTest.get("mymessage1"));
+    assertEquals((Long) 2100L, underTest.get("mymessage2"));
+  }
+
+  @Test
+  public void testLruCacheWithDates() {
+    // GIVEN
+    DateTime firstDate = DateTime.now();
+    DateTime secondDate = firstDate.plusMillis(500);
+    // WHEN
+    underTest.put("mymessage1", firstDate.toDate().getTime());
+    underTest.put("mymessage2", firstDate.toDate().getTime());
+    underTest.put("mymessage1", secondDate.toDate().getTime());
+    // THEN
+    assertEquals((Long) firstDate.toDate().getTime(), underTest.get("mymessage1"));
+    assertEquals((Long) firstDate.toDate().getTime(), underTest.get("mymessage2"));
+  }
+
+  @Test
+  public void testLruCacheWithDatesReachDedupInterval() {
+    // GIVEN
+    DateTime firstDate = DateTime.now();
+    DateTime secondDate = firstDate.plusMillis(1500);
+    // WHEN
+    underTest.put("mymessage1", firstDate.toDate().getTime());
+    underTest.put("mymessage2", firstDate.toDate().getTime());
+    underTest.put("mymessage1", secondDate.toDate().getTime());
+    // THEN
+    assertEquals((Long) secondDate.toDate().getTime(), underTest.get("mymessage1"));
+    assertEquals((Long) firstDate.toDate().getTime(), underTest.get("mymessage2"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
index 08680f6..8beecda 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.log4j.Logger;
 import org.junit.Test;
@@ -52,6 +53,7 @@ public class MapperDateTest {
 
     assertEquals("Value wasn't matched properly", d, mappedValue);
     assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+    assertEquals("Value wasn't put into jsonObj", d.getTime(), jsonObj.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
     assertTrue("jsonObj is not empty", jsonObj.isEmpty());
   }
 
@@ -73,6 +75,7 @@ public class MapperDateTest {
 
     assertEquals("Value wasn't matched properly", d, mappedValue);
     assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+    assertEquals("Value wasn't put into jsonObj", d.getTime(), jsonObj.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
     assertTrue("jsonObj is not empty", jsonObj.isEmpty());
   }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
new file mode 100644
index 0000000..1ccc319
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OutputLineFilterTest {
+
+  private static final String CACHE_KEY_FIELD = "log_message";
+  private static final String DEFAULT_DUMMY_MESSAGE = "myMessage";
+
+  private OutputLineFilter underTest;
+  private Input inputMock;
+
+  @Before
+  public void setUp() {
+    underTest = new OutputLineFilter();
+    inputMock = EasyMock.mock(Input.class);
+  }
+
+  @Test
+  public void testApplyWithFilterOutByDedupInterval() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE,
100L, false));
+    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+    EasyMock.replay(inputMock);
+    // WHEN
+    boolean result = underTest.apply(generateLineMap(), inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertTrue(result);
+  }
+
+  @Test
+  public void testApplyDoNotFilterOutDataByDedupInterval() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE,
10L, false));
+    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+    EasyMock.replay(inputMock);
+    // WHEN
+    boolean result = underTest.apply(generateLineMap(), inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertFalse(result);
+  }
+
+  @Test
+  public void testApplyWithFilterOutByDedupLast() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE,
10L, true));
+    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+    EasyMock.replay(inputMock);
+    // WHEN
+    boolean result = underTest.apply(generateLineMap(), inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertTrue(result);
+  }
+
+  @Test
+  public void testApplyDoNotFilterOutDataByDedupLast() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(createLruCache("myMessage2", 10L, true));
+    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+    EasyMock.replay(inputMock);
+    // WHEN
+    boolean result = underTest.apply(generateLineMap(), inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertFalse(result);
+  }
+
+  @Test
+  public void testApplyWithoutLruCache() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(null);
+    EasyMock.replay(inputMock);
+    // WHEN
+    boolean result = underTest.apply(generateLineMap(), inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertFalse(result);
+  }
+
+  @Test
+  public void testApplyWithoutInMemoryTimestamp() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE,
100L, true));
+    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+    EasyMock.replay(inputMock);
+    Map<String, Object> lineMap = generateLineMap();
+    lineMap.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP);
+    // WHEN
+    boolean result = underTest.apply(lineMap, inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertFalse(result);
+  }
+
+  @Test
+  public void testApplyWithoutLogMessage() {
+    // GIVEN
+    EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE,
100L, true));
+    EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+    EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+    EasyMock.replay(inputMock);
+    Map<String, Object> lineMap = generateLineMap();
+    lineMap.remove(CACHE_KEY_FIELD);
+    // WHEN
+    boolean result = underTest.apply(lineMap, inputMock);
+    // THEN
+    EasyMock.verify(inputMock);
+    assertFalse(result);
+  }
+
+  private Map<String, Object> generateLineMap() {
+    Map<String, Object> lineMap = new HashMap<>();
+    lineMap.put(CACHE_KEY_FIELD, "myMessage");
+    lineMap.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, 150L);
+    return lineMap;
+  }
+
+  private Map<String, Object> generateInputConfigs() {
+    Map<String, Object> inputConfigs = new HashMap<>();
+    inputConfigs.put(LogFeederConstants.ROW_TYPE, "service");
+    return inputConfigs;
+  }
+
+  private LRUCache createLruCache(String defaultKey, long defaultValue, boolean lastDedupEanabled)
{
+    LRUCache lruCache = new LRUCache(4, "myfilepath", 100, lastDedupEanabled);
+    lruCache.put(defaultKey, defaultValue);
+    return lruCache;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
index a080fa8..0a0a195 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
@@ -103,8 +103,9 @@ public class OutputManagerTest {
     expect(mockInput.isUseEventMD5()).andReturn(false);
     expect(mockInput.isGenEventMD5()).andReturn(false);
     expect(mockInput.getConfigs()).andReturn(Collections.<String, Object> emptyMap());
+    expect(mockInput.getCache()).andReturn(null);
     expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
-    
+
     output1.write(jsonObj, inputMarker); expectLastCall();
     output2.write(jsonObj, inputMarker); expectLastCall();
     output3.write(jsonObj, inputMarker); expectLastCall();

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 879b786..068bc3a 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -28,3 +28,8 @@ logfeeder.log.filter.enable=true
 logfeeder.solr.config.interval=5
 logfeeder.solr.core.config.name=history
 logfeeder.solr.zk_connect_string=localhost:9983
+logfeeder.cache.enabled=true
+logfeeder.cache.size=100
+logfeeder.cache.key.field=log_message
+logfeeder.cache.dedup.interval=1000
+logfeeder.cache.last.dedup.enabled=true

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
index 122a9e1..d3685a4 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
@@ -3,7 +3,10 @@
     {
       "type": "zookeeper",
       "rowtype": "service",
-      "path": "/root/test-logs/zookeeper/zookeeper-test-log.txt"
+      "path": "/root/test-logs/zookeeper/zookeeper-test-log.txt",
+      "cache_enabled" : "true",
+      "cache_size" : "10",
+      "cache_dedup_interval" : "1000"
     }
   ],
   "filter": [

http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
index 2acb57e..ca14c1f 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
@@ -64,4 +64,60 @@
     </value-attributes>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>logfeeder.cache.enabled</name>
+    <value>false</value>
+    <description>
+      Enable input cache for every monitored input file. The cache stores log lines, based
on the data, duplications can be dropped.
+    </description>
+    <display-name>Input cache enabled</display-name>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>logfeeder.cache.size</name>
+    <value>100</value>
+    <description>Size of the input caches</description>
+    <display-name>Input cache size</display-name>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>logfeeder.cache.dedup.interval</name>
+    <value>1000</value>
+    <description>
+      If input cache is enabled, Log Feeder can drop any duplicated line during log processing,
+      but only if the duplicated lines/messages are in the same interval (in milliseconds)
with the original message/line.
+    </description>
+    <display-name>Input cache dedup interval</display-name>
+    <value-attributes>
+      <type>int</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>logfeeder.cache.last.dedup.enabled</name>
+    <value>false</value>
+    <description>
+      If last dedup is enabled for input cache, Log Feeder will drop every new line (message),
which is the same as the last line.
+    </description>
+    <display-name>Input cache last dedup</display-name>
+    <value-attributes>
+      <type>boolean</type>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>logfeeder.cache.key.field</name>
+    <value>log_message</value>
+    <description>
+      Key field, which will be used as keys in the Input cache. (by defalt, log_message represets
the message part of processed data)<
+    /description>
+    <display-name>Input cache key field</display-name>
+    <on-ambari-upgrade add="true"/>
+  </property>
 </configuration>


Mime
View raw message