Return-Path: X-Original-To: apmail-ambari-commits-archive@www.apache.org Delivered-To: apmail-ambari-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EF49619D46 for ; Sat, 9 Apr 2016 10:15:02 +0000 (UTC) Received: (qmail 88747 invoked by uid 500); 9 Apr 2016 10:15:02 -0000 Delivered-To: apmail-ambari-commits-archive@ambari.apache.org Received: (qmail 88565 invoked by uid 500); 9 Apr 2016 10:15:02 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 85653 invoked by uid 99); 9 Apr 2016 10:14:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Apr 2016 10:14:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8392DFF94; Sat, 9 Apr 2016 10:14:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: oleewere@apache.org To: commits@ambari.apache.org Date: Sat, 09 Apr 2016 10:15:44 -0000 Message-Id: <538f06b7064f4cc6a74323838f87943f@git.apache.org> In-Reply-To: <63d0311e3ef1479090f903b1b8d63b11@git.apache.org> References: <63d0311e3ef1479090f903b1b8d63b11@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [48/61] [abbrv] [partial] ambari git commit: AMBARI-15679. Initial commit for LogSearch service definition (oleewre) http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java new file mode 100644 index 0000000..9b2a717 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.filter; + +import java.io.BufferedInputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import oi.thekraken.grok.api.Grok; +import oi.thekraken.grok.api.exception.GrokException; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.MetricCount; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import com.google.gson.reflect.TypeToken; + +public class FilterGrok extends Filter { + static private Logger logger = Logger.getLogger(FilterGrok.class); + + private static final String GROK_PATTERN_FILE = "grok-patterns"; + + String messagePattern = null; + String multilinePattern = null; + + Grok grokMultiline = null; + Grok grokMessage = null; + + StringBuilder strBuff = null; + String currMultilineJsonStr = null; + + InputMarker firstInputMarker = null; + InputMarker savedInputMarker = null; + + String sourceField = null; + boolean removeSourceField = true; + + Set namedParamList = new HashSet(); + Set multiLineamedParamList = new HashSet(); + + Type jsonType = new TypeToken>() { + }.getType(); + + public MetricCount grokErrorMetric = new MetricCount(); + + @Override + public void init() throws Exception { + super.init(); + + try { + grokErrorMetric.metricsName = "filter.error.grok"; + // Get the Grok file patterns + messagePattern = escapePattern(getStringValue("message_pattern")); + multilinePattern = escapePattern(getStringValue("multiline_pattern")); + sourceField = getStringValue("source_field"); + removeSourceField = getBooleanValue("remove_source_field", + removeSourceField); + + logger.info("init() done. grokPattern=" + messagePattern + + ", multilinePattern=" + multilinePattern + ", " + + getShortDescription()); + if (StringUtils.isEmpty(messagePattern)) { + logger.error("message_pattern is not set for filter."); + return; + } + extractNamedParams(messagePattern, namedParamList); + + grokMessage = new Grok(); + // grokMessage.addPatternFromReader(r); + loadPatterns(grokMessage); + grokMessage.compile(messagePattern); + if (!StringUtils.isEmpty(multilinePattern)) { + extractNamedParams(multilinePattern, multiLineamedParamList); + + grokMultiline = new Grok(); + loadPatterns(grokMultiline); + grokMultiline.compile(multilinePattern); + } + } catch (Throwable t) { + logger.fatal( + "Caught exception while initializing Grok. multilinePattern=" + + multilinePattern + ", messagePattern=" + + messagePattern, t); + grokMessage = null; + grokMultiline = null; + } + + } + + /** + * @param stringValue + * @return + */ + private String escapePattern(String inPattern) { + String inStr = inPattern; + if (inStr != null) { + if (inStr.contains("(?m)") && !inStr.contains("(?s)")) { + inStr = inStr.replaceFirst("(?m)", "(?s)"); + } + // inStr = inStr.replaceAll("\\[", "\\\\["); + // inStr = inStr.replaceAll("\\]", "\\\\]"); + // inStr = inStr.replaceAll("\\(", "\\\\("); + // inStr = inStr.replaceAll("\\)", "\\\\)"); + } + return inStr; + } + + private void extractNamedParams(String patternStr, Set paramList) { + String grokRegEx = "%\\{" + "(?" + "(?[A-z0-9]+)" + + "(?::(?[A-z0-9_:]+))?" + ")" + "(?:=(?" + + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + "\\}"; + + Pattern pattern = Pattern.compile(grokRegEx); + java.util.regex.Matcher matcher = pattern.matcher(patternStr); + while (matcher.find()) { + String subname = matcher.group(3); + if (subname != null) { + paramList.add(subname); + } + } + } + + private boolean loadPatterns(Grok grok) { + InputStreamReader grokPatternsReader = null; + logger.info("Loading pattern file " + GROK_PATTERN_FILE); + try { + BufferedInputStream fileInputStream = (BufferedInputStream) this + .getClass().getClassLoader() + .getResourceAsStream(GROK_PATTERN_FILE); + if (fileInputStream == null) { + logger.fatal("Couldn't load grok-patterns file " + + GROK_PATTERN_FILE + ". Things will not work"); + return false; + } + grokPatternsReader = new InputStreamReader(fileInputStream); + } catch (Throwable t) { + logger.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE + + " from classpath. Grok filtering will not work.", t); + return false; + } + try { + grok.addPatternFromReader(grokPatternsReader); + } catch (GrokException e) { + logger.fatal( + "Error loading patterns from grok-patterns reader for file " + + GROK_PATTERN_FILE, e); + return false; + } + + return true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String) + */ + @Override + public void apply(String inputStr, InputMarker inputMarker) { + if (grokMessage == null) { + return; + } + + if (grokMultiline != null) { + // Check if new line + String jsonStr = grokMultiline.capture(inputStr); + if (!"{}".equals(jsonStr)) { + // New line + if (strBuff != null) { + savedInputMarker.beginLineNumber = firstInputMarker.lineNumber; + // Construct JSON object and add only the interested named + // parameters + Map jsonObj = Collections + .synchronizedMap(new HashMap()); + try { + // Handle message parsing + applyMessage(strBuff.toString(), jsonObj, + currMultilineJsonStr); + } finally { + strBuff = null; + savedInputMarker = null; + firstInputMarker = null; + } + } + currMultilineJsonStr = jsonStr; + } + + if (strBuff == null) { + strBuff = new StringBuilder(); + firstInputMarker = inputMarker; + } else { + // strBuff.append(System.lineSeparator()); + strBuff.append('\r'); + strBuff.append('\n'); + } + strBuff.append(inputStr); + savedInputMarker = inputMarker; + } else { + savedInputMarker = inputMarker; + Map jsonObj = Collections + .synchronizedMap(new HashMap()); + applyMessage(inputStr, jsonObj, null); + } + } + + @Override + public void apply(Map jsonObj, InputMarker inputMarker) { + if (sourceField != null) { + savedInputMarker = inputMarker; + applyMessage((String) jsonObj.get(sourceField), jsonObj, null); + if (removeSourceField) { + jsonObj.remove(sourceField); + } + } + } + + /** + * @param inputStr + * @param jsonObj + */ + private void applyMessage(String inputStr, Map jsonObj, + String multilineJsonStr) { + String jsonStr = grokParse(inputStr); + + boolean parseError = false; + if ("{}".equals(jsonStr)) { + parseError = true; + // Error parsing string. + logParseError(inputStr); + + if (multilineJsonStr == null) { + // TODO: Should we just add this as raw message in solr? + return; + } + } + + if (parseError) { + jsonStr = multilineJsonStr; + } + Map jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr, + jsonType); + for (String namedParam : namedParamList) { + if (jsonSrc.get(namedParam) != null) { + jsonObj.put(namedParam, jsonSrc.get(namedParam)); + } + } + if (parseError) { + // Add error tags + @SuppressWarnings("unchecked") + List tagsList = (List) jsonObj.get("tags"); + if (tagsList == null) { + tagsList = new ArrayList(); + jsonObj.put("tags", tagsList); + } + tagsList.add("error_grok_parsing"); + if (sourceField == null) { + // For now let's put the raw message in log_message, so it is + // will be searchable + jsonObj.put("log_message", inputStr); + } + } + + super.apply(jsonObj, savedInputMarker); + statMetric.count++; + } + + public String grokParse(String inputStr) { + String jsonStr = grokMessage.capture(inputStr); + return jsonStr; + } + + private void logParseError(String inputStr) { + grokErrorMetric.count++; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_PARSEERROR"; + int inputStrLength = inputStr != null ? inputStr.length() : 0; + LogFeederUtil.logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Error parsing string. length=" + inputStrLength + + ", input=" + input.getShortDescription() + + ". First upto 100 characters=" + + LogFeederUtil.subString(inputStr, 100), null, logger, + Level.WARN); + } + + @Override + public void flush() { + if (strBuff != null) { + // Handle message parsing + Map jsonObj = Collections + .synchronizedMap(new HashMap()); + applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr); + strBuff = null; + savedInputMarker = null; + } + super.flush(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription() + */ + @Override + public String getShortDescription() { + return "filter:filter=grok,regex=" + messagePattern; + } + + @Override + public void addMetricsContainers(List metricsList) { + super.addMetricsContainers(metricsList); + metricsList.add(grokErrorMetric); + } + + @Override + public void logStat() { + super.logStat(); + // Printing stat for grokErrors + logStatForMetric(grokErrorMetric, "Stat: Grok Errors"); + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java new file mode 100644 index 0000000..c4da3cb --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.filter; + +import java.util.List; +import java.util.Map; +import java.util.StringTokenizer; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.MetricCount; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class FilterKeyValue extends Filter { + static private Logger logger = Logger.getLogger(FilterKeyValue.class); + + String sourceField = null; + String valueSplit = "="; + String fieldSplit = "\t"; + + public MetricCount errorMetric = new MetricCount(); + + @Override + public void init() throws Exception { + super.init(); + errorMetric.metricsName = "filter.error.keyvalue"; + + sourceField = getStringValue("source_field"); + valueSplit = getStringValue("value_split", valueSplit); + fieldSplit = getStringValue("field_split", fieldSplit); + + logger.info("init() done. source_field=" + sourceField + + ", value_split=" + valueSplit + ", " + ", field_split=" + + fieldSplit + ", " + getShortDescription()); + if (StringUtils.isEmpty(sourceField)) { + logger.fatal("source_field is not set for filter. This filter will not be applied"); + return; + } + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String) + */ + @Override + public void apply(String inputStr, InputMarker inputMarker) { + apply(LogFeederUtil.toJSONObject(inputStr), inputMarker); + } + + @Override + public void apply(Map jsonObj, InputMarker inputMarker) { + if (sourceField == null) { + return; + } + Object valueObj = jsonObj.get(sourceField); + if (valueObj != null) { + StringTokenizer fieldTokenizer = new StringTokenizer( + valueObj.toString(), fieldSplit); + while (fieldTokenizer.hasMoreTokens()) { + String nv = fieldTokenizer.nextToken(); + StringTokenizer nvTokenizer = new StringTokenizer(nv, + valueSplit); + while (nvTokenizer.hasMoreTokens()) { + String name = nvTokenizer.nextToken(); + if (nvTokenizer.hasMoreTokens()) { + String value = nvTokenizer.nextToken(); + jsonObj.put(name, value); + } else { + // Unbalanced name value pairs + logParseError("name=" + name + ", pair=" + nv + + ", field=" + sourceField + ", field_value=" + + valueObj); + } + } + } + } + super.apply(jsonObj, inputMarker); + statMetric.count++; + } + + private void logParseError(String inputStr) { + errorMetric.count++; + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_PARSEERROR"; + LogFeederUtil + .logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Error parsing string. length=" + inputStr.length() + + ", input=" + input.getShortDescription() + + ". First upto 100 characters=" + + LogFeederUtil.subString(inputStr, 100), null, logger, + Level.ERROR); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription() + */ + @Override + public String getShortDescription() { + return "filter:filter=keyvalue,regex=" + sourceField; + } + + @Override + public void addMetricsContainers(List metricsList) { + super.addMetricsContainers(metricsList); + metricsList.add(errorMetric); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java new file mode 100644 index 0000000..5c4d30e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.filter; + +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.log4j.Logger; + + +public class JSONFilterCode extends Filter { + private static Logger logger = Logger.getLogger(JSONFilterCode.class); + + @Override + public void apply(String inputStr, InputMarker inputMarker) { + Map jsonMap = LogFeederUtil.toJSONObject(inputStr); + // linenumber + Double lineNumberD = (Double) jsonMap.get("line_number"); + if (lineNumberD != null) { + long lineNumber = lineNumberD.longValue(); + jsonMap.put("line_number", lineNumber); + } + // logtime + String timeStampStr = (String) jsonMap.get("logtime"); + if (timeStampStr != null && !timeStampStr.isEmpty()) { + String logtime = LogFeederUtil.getDate(timeStampStr); + jsonMap.put("logtime", logtime); + } + super.apply(jsonMap, inputMarker); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java new file mode 100644 index 0000000..ec75f2d --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.input; + +import java.io.File; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logfeeder.ConfigBlock; +import org.apache.ambari.logfeeder.InputMgr; +import org.apache.ambari.logfeeder.MetricCount; +import org.apache.ambari.logfeeder.OutputMgr; +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.log4j.Logger; + +public abstract class Input extends ConfigBlock implements Runnable { + static private Logger logger = Logger.getLogger(Input.class); + + OutputMgr outputMgr; + InputMgr inputMgr; + + List outputList = new ArrayList(); + + Filter firstFilter = null; + Thread thread; + private boolean isClosed = false; + String filePath = null; + String type = null; + + boolean tail = true; + boolean useEventMD5 = false; + boolean genEventMD5 = true; + + public MetricCount readBytesMetric = new MetricCount(); + + /** + * This method will be called from the thread spawned for the output. This + * method should only exit after all data are read from the source or the + * process is exiting + * + * @throws Exception + */ + abstract void start() throws Exception; + + @Override + public void init() throws Exception { + super.init(); + tail = getBooleanValue("tail", tail); + useEventMD5 = getBooleanValue("use_event_md5_as_id", useEventMD5); + genEventMD5 = getBooleanValue("gen_event_md5", genEventMD5); + + if (firstFilter != null) { + firstFilter.init(); + } + } + + @Override + public String getNameForThread() { + if (filePath != null) { + try { + return (type + "=" + (new File(filePath)).getName()); + } catch (Throwable ex) { + logger.warn("Couldn't get basename for filePath=" + filePath, + ex); + } + } + return super.getNameForThread() + ":" + type; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + try { + logger.info("Started to monitor. " + getShortDescription()); + start(); + } catch (Exception e) { + logger.error("Error writing to output.", e); + } + logger.info("Exiting thread. " + getShortDescription()); + } + + public void outputLine(String line, InputMarker marker) { + statMetric.count++; + readBytesMetric.count += (line.length()); + + if (firstFilter != null) { + firstFilter.apply(line, marker); + } else { + // TODO: For now, let's make filter mandatory, so that no one + // accidently forgets to write filter + // outputMgr.write(line, this); + } + } + + /** + * + */ + public void flush() { + if (firstFilter != null) { + firstFilter.flush(); + } + } + + public boolean monitor() { + if (isReady()) { + logger.info("Starting thread. " + getShortDescription()); + thread = new Thread(this, getNameForThread()); + thread.start(); + return true; + } else { + return false; + } + } + + public void checkIn(InputMarker inputMarker) { + // Default implementation is to ignore. + } + + /** + * This is generally used by final checkin + */ + public void checkIn() { + + } + + /** + * @return + */ + public boolean isReady() { + return true; + } + + public boolean isTail() { + return tail; + } + + public void setTail(boolean tail) { + this.tail = tail; + } + + public boolean isUseEventMD5() { + return useEventMD5; + } + + public void setUseEventMD5(boolean useEventMD5) { + this.useEventMD5 = useEventMD5; + } + + public boolean isGenEventMD5() { + return genEventMD5; + } + + public void setGenEventMD5(boolean genEventMD5) { + this.genEventMD5 = genEventMD5; + } + + @Override + public void setDrain(boolean drain) { + logger.info("Request to drain. " + getShortDescription()); + super.setDrain(drain); + ; + try { + thread.interrupt(); + } catch (Throwable t) { + // ignore + } + } + + public Filter getFirstFilter() { + return firstFilter; + } + + public void setFirstFilter(Filter filter) { + firstFilter = filter; + } + + public void setInputMgr(InputMgr inputMgr) { + this.inputMgr = inputMgr; + } + + public void setOutputMgr(OutputMgr outputMgr) { + this.outputMgr = outputMgr; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public void close() { + logger.info("Close called. " + getShortDescription()); + + try { + if (firstFilter != null) { + firstFilter.close(); + } else { + outputMgr.close(); + } + } catch (Throwable t) { + // Ignore + } + isClosed = true; + } + + public void setClosed(boolean isClosed) { + this.isClosed = isClosed; + } + + public boolean isClosed() { + return isClosed; + } + + @Override + public void loadConfig(Map map) { + super.loadConfig(map); + String typeValue = getStringValue("type"); + if (typeValue != null) { + // Explicitly add type and value to field list + contextFields.put("type", typeValue); + @SuppressWarnings("unchecked") + Map addFields = (Map) map + .get("add_fields"); + if (addFields == null) { + addFields = new HashMap(); + map.put("add_fields", addFields); + } + addFields.put("type", typeValue); + } + } + + @Override + public String getShortDescription() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void logStat() { + super.logStat(); + logStatForMetric(readBytesMetric, "Stat: Bytes Read"); + + if (firstFilter != null) { + firstFilter.logStat(); + } + } + + @Override + public String toString() { + return getShortDescription(); + } + + /** + * + */ + public void rollOver() { + // Only some inputs support it. E.g. InputFile + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public Date getEventTime() { + return null; + } + + public List getOutputList() { + return outputList; + } + + /** + * @param output + */ + public void addOutput(Output output) { + outputList.add(output); + } + + /** + * @param metricsList + */ + public void addMetricsContainers(List metricsList) { + super.addMetricsContainers(metricsList); + if (firstFilter != null) { + firstFilter.addMetricsContainers(metricsList); + } + metricsList.add(readBytesMetric); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java new file mode 100644 index 0000000..420610a --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input; + +import java.io.BufferedReader; +import java.io.EOFException; +import java.io.File; +import java.io.FileFilter; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.solr.common.util.Base64; + +public class InputFile extends Input { + static private Logger logger = Logger.getLogger(InputFile.class); + + // String startPosition = "beginning"; + String logPath = null; + boolean isStartFromBegining = true; + + boolean isReady = false; + File[] logPathFiles = null; + Object fileKey = null; + String base64FileKey = null; + + private boolean isRolledOver = false; + boolean addWildCard = false; + + long lastCheckPointTimeMS = 0; + int checkPointIntervalMS = 5 * 1000; // 5 seconds + RandomAccessFile checkPointWriter = null; + Map jsonCheckPoint = null; + + File checkPointFile = null; + + private InputMarker lastCheckPointInputMarker = null; + + private String checkPointExtension = ".cp"; + + @Override + public void init() throws Exception { + logger.info("init() called"); + statMetric.metricsName = "input.files.read_lines"; + readBytesMetric.metricsName = "input.files.read_bytes"; + checkPointExtension = LogFeederUtil.getStringProperty( + "logfeeder.checkpoint.extension", checkPointExtension); + + // Let's close the file and set it to true after we start monitoring it + setClosed(true); + logPath = getStringValue("path"); + tail = getBooleanValue("tail", tail); + addWildCard = getBooleanValue("add_wild_card", addWildCard); + checkPointIntervalMS = getIntValue("checkpoint.interval.ms", + checkPointIntervalMS); + + if (logPath == null || logPath.isEmpty()) { + logger.error("path is empty for file input. " + + getShortDescription()); + return; + } + + String startPosition = getStringValue("start_position"); + if (StringUtils.isEmpty(startPosition) + || startPosition.equalsIgnoreCase("beginning") + || startPosition.equalsIgnoreCase("begining")) { + isStartFromBegining = true; + } + + if (!tail) { + // start position end doesn't apply if we are not tailing + isStartFromBegining = true; + } + + setFilePath(logPath); + boolean isFileReady = isReady(); + + logger.info("File to monitor " + logPath + ", tail=" + tail + + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady); + + super.init(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.input.Input#isReady() + */ + @Override + public boolean isReady() { + if (!isReady) { + // Let's try to check whether the file is available + logPathFiles = getActualFiles(logPath); + if (logPathFiles != null && logPathFiles.length > 0 + && logPathFiles[0].isFile()) { + + if (isTail() && logPathFiles.length > 1) { + logger.warn("Found multiple files (" + logPathFiles.length + + ") for the file filter " + filePath + + ". Will use only the first one. Using " + + logPathFiles[0].getAbsolutePath()); + } + logger.info("File filter " + filePath + " expanded to " + + logPathFiles[0].getAbsolutePath()); + isReady = true; + } else { + logger.debug(logPath + " file doesn't exist. Ignoring for now"); + } + } + return isReady; + } + + private File[] getActualFiles(String searchPath) { + if (addWildCard) { + if (!searchPath.endsWith("*")) { + searchPath = searchPath + "*"; + } + } + File checkFile = new File(searchPath); + if (checkFile.isFile()) { + return new File[]{checkFile}; + } + // Let's do wild card search + // First check current folder + File checkFiles[] = findFileForWildCard(searchPath, new File(".")); + if (checkFiles == null || checkFiles.length == 0) { + // Let's check from the parent folder + File parentDir = (new File(searchPath)).getParentFile(); + if (parentDir != null) { + String wildCard = (new File(searchPath)).getName(); + checkFiles = findFileForWildCard(wildCard, parentDir); + } + } + return checkFiles; + } + + private File[] findFileForWildCard(String searchPath, File dir) { + logger.debug("findFileForWildCard(). filePath=" + searchPath + ", dir=" + + dir + ", dir.fullpath=" + dir.getAbsolutePath()); + FileFilter fileFilter = new WildcardFileFilter(searchPath); + return dir.listFiles(fileFilter); + } + + @Override + synchronized public void checkIn(InputMarker inputMarker) { + super.checkIn(inputMarker); + if (checkPointWriter != null) { + try { + int lineNumber = LogFeederUtil.objectToInt( + jsonCheckPoint.get("line_number"), 0, "line_number"); + if (lineNumber > inputMarker.lineNumber) { + // Already wrote higher line number for this input + return; + } + // If interval is greater than last checkPoint time, then write + long currMS = System.currentTimeMillis(); + if (!isClosed() + && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { + // Let's save this one so we can update the check point file + // on flush + lastCheckPointInputMarker = inputMarker; + return; + } + lastCheckPointTimeMS = currMS; + + jsonCheckPoint.put("line_number", "" + + new Integer(inputMarker.lineNumber)); + jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); + jsonCheckPoint.put("last_write_time_date", new Date()); + + String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); + + // Let's rewind + checkPointWriter.seek(0); + checkPointWriter.writeInt(jsonStr.length()); + checkPointWriter.write(jsonStr.getBytes()); + + if (isClosed()) { + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_FINAL_CHECKIN"; + LogFeederUtil.logErrorMessageByInterval( + LOG_MESSAGE_KEY, + "Wrote final checkPoint, input=" + + getShortDescription() + + ", checkPointFile=" + + checkPointFile.getAbsolutePath() + + ", checkPoint=" + jsonStr, null, logger, + Level.INFO); + } + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() + + "_CHECKIN_EXCEPTION"; + LogFeederUtil + .logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Caught exception checkIn. , input=" + + getShortDescription(), t, logger, + Level.ERROR); + } + } + + } + + @Override + public void checkIn() { + super.checkIn(); + if (lastCheckPointInputMarker != null) { + checkIn(lastCheckPointInputMarker); + } + } + + @Override + public void rollOver() { + logger.info("Marking this input file for rollover. " + + getShortDescription()); + isRolledOver = true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.input.Input#monitor() + */ + @Override + void start() throws Exception { + if (logPathFiles == null || logPathFiles.length == 0) { + return; + } + + if (isTail()) { + // Just process the first file + processFile(logPathFiles[0]); + } else { + for (File file : logPathFiles) { + try { + processFile(file); + if (isClosed() || isDrain()) { + logger.info("isClosed or isDrain. Now breaking loop."); + break; + } + } catch (Throwable t) { + logger.error( + "Error processing file=" + file.getAbsolutePath(), + t); + } + } + } + // Call the close for the input. Which should flush to the filters and + // output + close(); + } + + @Override + public void close() { + super.close(); + logger.info("close() calling checkPoint checkIn(). " + + getShortDescription()); + checkIn(); + } + + private void processFile(File logPathFile) throws FileNotFoundException, + IOException { + logger.info("Monitoring logPath=" + logPath + ", logPathFile=" + + logPathFile); + BufferedReader br = null; + checkPointFile = null; + checkPointWriter = null; + jsonCheckPoint = null; + int resumeFromLineNumber = 0; + + int lineCount = 0; + try { + setFilePath(logPathFile.getAbsolutePath()); +// br = new BufferedReader(new FileReader(logPathFile)); + br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile)); + + // Whether to send to output from the beginning. + boolean resume = isStartFromBegining; + + // Seems FileWatch is not reliable, so let's only use file key + // comparison + // inputMgr.monitorSystemFileChanges(this); + fileKey = getFileKey(logPathFile); + base64FileKey = Base64.byteArrayToBase64(fileKey.toString() + .getBytes()); + logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + + ". " + getShortDescription()); + + if (isTail()) { + try { + // Let's see if there is a checkpoint for this file + logger.info("Checking existing checkpoint file. " + + getShortDescription()); + + String fileBase64 = Base64.byteArrayToBase64(fileKey + .toString().getBytes()); + String checkPointFileName = fileBase64 + + checkPointExtension; + File checkPointFolder = inputMgr.getCheckPointFolderFile(); + checkPointFile = new File(checkPointFolder, + checkPointFileName); + checkPointWriter = new RandomAccessFile(checkPointFile, + "rw"); + + try { + int contentSize = checkPointWriter.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointWriter.read(b, 0, contentSize); + if (readSize != contentSize) { + logger.error("Couldn't read expected number of bytes from checkpoint file. expected=" + + contentSize + + ", read=" + + readSize + + ", checkPointFile=" + + checkPointFile + + ", input=" + getShortDescription()); + } else { + // Create JSON string + String jsonCheckPointStr = new String(b, 0, + readSize); + jsonCheckPoint = LogFeederUtil + .toJSONObject(jsonCheckPointStr); + + resumeFromLineNumber = LogFeederUtil.objectToInt( + jsonCheckPoint.get("line_number"), 0, + "line_number"); + + if (resumeFromLineNumber > 0) { + // Let's read from last line read + resume = false; + } + logger.info("CheckPoint. checkPointFile=" + + checkPointFile + ", json=" + + jsonCheckPointStr + + ", resumeFromLineNumber=" + + resumeFromLineNumber + ", resume=" + + resume); + } + } catch (EOFException eofEx) { + logger.info("EOFException. Will reset checkpoint file " + + checkPointFile.getAbsolutePath() + " for " + + getShortDescription()); + } + if (jsonCheckPoint == null) { + // This seems to be first time, so creating the initial + // checkPoint object + jsonCheckPoint = new HashMap(); + jsonCheckPoint.put("file_path", filePath); + jsonCheckPoint.put("file_key", fileBase64); + } + + } catch (Throwable t) { + logger.error( + "Error while configuring checkpoint file. Will reset file. checkPointFile=" + + checkPointFile, t); + } + } + + setClosed(false); + int sleepStep = 2; + int sleepIteration = 0; + while (true) { + try { + if (isDrain()) { + break; + } + + String line = br.readLine(); + if (line == null) { + if (!resume) { + resume = true; + } + sleepIteration++; + try { + // Since FileWatch service is not reliable, we will + // check + // file inode every n seconds after no write + if (sleepIteration > 4) { + Object newFileKey = getFileKey(logPathFile); + if (newFileKey != null) { + if (fileKey == null + || !newFileKey.equals(fileKey)) { + logger.info("File key is different. Calling rollover. oldKey=" + + fileKey + + ", newKey=" + + newFileKey + + ". " + + getShortDescription()); + // File has rotated. + rollOver(); + } + } + } + // Flush on the second iteration + if (!tail && sleepIteration >= 2) { + logger.info("End of file. Done with filePath=" + + logPathFile.getAbsolutePath() + + ", lineCount=" + lineCount); + flush(); + break; + } else if (sleepIteration == 2) { + flush(); + } else if (sleepIteration >= 2) { + if (isRolledOver) { + isRolledOver = false; + // Close existing file + try { + logger.info("File is rolled over. Closing current open file." + + getShortDescription() + + ", lineCount=" + lineCount); + br.close(); + } catch (Exception ex) { + logger.error("Error closing file" + + getShortDescription()); + break; + } + try { + // Open new file + logger.info("Opening new rolled over file." + + getShortDescription()); +// br = new BufferedReader(new FileReader( +// logPathFile)); + br = new BufferedReader(LogsearchReaderFactory. + INSTANCE.getReader(logPathFile)); + lineCount = 0; + fileKey = getFileKey(logPathFile); + base64FileKey = Base64 + .byteArrayToBase64(fileKey + .toString().getBytes()); + logger.info("fileKey=" + fileKey + + ", base64=" + base64FileKey + + ", " + getShortDescription()); + } catch (Exception ex) { + logger.error("Error opening rolled over file. " + + getShortDescription()); + // Let's add this to monitoring and exit + // this + // thread + logger.info("Added input to not ready list." + + getShortDescription()); + isReady = false; + inputMgr.addToNotReady(this); + break; + } + logger.info("File is successfully rolled over. " + + getShortDescription()); + continue; + } + } + Thread.sleep(sleepStep * 1000); + sleepStep = (sleepStep * 2); + sleepStep = sleepStep > 10 ? 10 : sleepStep; + } catch (InterruptedException e) { + logger.info("Thread interrupted." + + getShortDescription()); + } + } else { + lineCount++; + sleepStep = 1; + sleepIteration = 0; + + if (!resume && lineCount > resumeFromLineNumber) { + logger.info("Resuming to read from last line. lineCount=" + + lineCount + + ", input=" + + getShortDescription()); + resume = true; + } + if (resume) { + InputMarker marker = new InputMarker(); + marker.fileKey = fileKey; + marker.base64FileKey = base64FileKey; + marker.filePath = filePath; + marker.input = this; + marker.lineNumber = lineCount; + outputLine(line, marker); + } + } + } catch (Throwable t) { + final String LOG_MESSAGE_KEY = this.getClass() + .getSimpleName() + "_READ_LOOP_EXCEPTION"; + LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, + "Caught exception in read loop. lineNumber=" + + lineCount + ", input=" + + getShortDescription(), t, logger, + Level.ERROR); + + } + } + } finally { + if (br != null) { + logger.info("Closing reader." + getShortDescription() + + ", lineCount=" + lineCount); + try { + br.close(); + } catch (Throwable t) { + // ignore + } + } + } + } + + /** + * @param logPathFile2 + * @return + */ + static public Object getFileKey(File file) { + try { + Path fileFullPath = Paths.get(file.getAbsolutePath()); + if (fileFullPath != null) { + BasicFileAttributes basicAttr = Files.readAttributes( + fileFullPath, BasicFileAttributes.class); + return basicAttr.fileKey(); + } + } catch (Throwable ex) { + logger.error("Error getting file attributes for file=" + file, ex); + } + return file.toString(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ambari.logfeeder.input.Input#getShortDescription() + */ + @Override + public String getShortDescription() { + return "input:source=" + + getStringValue("source") + + ", path=" + + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0] + .getAbsolutePath() : getStringValue("path")); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java new file mode 100644 index 0000000..6196068 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.input; + +/** + * This file contains the file inode, line number of the log currently been read + */ +public class InputMarker { + public int lineNumber = 0; + public int beginLineNumber = 0; + public Input input; + public String filePath; + public Object fileKey = null; + public String base64FileKey = null; + + @Override + public String toString() { + return "InputMarker [lineNumber=" + lineNumber + ", input=" + + input.getShortDescription() + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java new file mode 100644 index 0000000..9c46c4e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.reader; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.zip.GZIPInputStream; + +import org.apache.log4j.Logger; + +public class GZIPReader extends InputStreamReader { + + private static Logger logger = Logger.getLogger(GZIPReader.class); + + public GZIPReader(String fileName) throws FileNotFoundException { + super(getStream(fileName)); + logger.info("Created GZIPReader for file : " + fileName); + } + + public GZIPReader(File file) throws FileNotFoundException { + super(getStream(file.getName())); + } + + private static InputStream getStream(String fileName) { + InputStream gzipStream = null; + InputStream fileStream = null; + try { + fileStream = new FileInputStream(fileName); + gzipStream = new GZIPInputStream(fileStream); + } catch (Exception e) { + logger.error(e, e.getCause()); + } + return gzipStream; + } + + /** + * validating file based on magic number + * + * @param fileName + * @return + */ + public static boolean isValidFile(String fileName) { + // TODO make it generic and put in factory itself + InputStream is = null; + try { + is = new FileInputStream(fileName); + byte[] signature = new byte[2]; + int nread = is.read(signature); // read the gzip signature + return nread == 2 && signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b; + } catch (IOException e) { + return false; + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java new file mode 100644 index 0000000..a231807 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.input.reader; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.Reader; + +import org.apache.log4j.Logger; + +public enum LogsearchReaderFactory { + INSTANCE; + private static Logger logger = Logger + .getLogger(LogsearchReaderFactory.class); + + /** + * @param fileName + * @return + * @throws FileNotFoundException + */ + public Reader getReader(File file) throws FileNotFoundException { + logger.debug("Inside reader factory for file:" + file); + if (GZIPReader.isValidFile(file.getAbsolutePath())) { + logger.info("Reading file " + file + " as gzip file"); + return new GZIPReader(file.getAbsolutePath()); + } else { + return new FileReader(file); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java new file mode 100644 index 0000000..fc12458 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.logconfig; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.TimeZone; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.SolrUtil; +import org.apache.ambari.logfeeder.view.VLogfeederFilter; +import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper; +import org.apache.log4j.Logger; + +public class FetchConfigFromSolr extends Thread { + private static Logger logger = Logger.getLogger(FetchConfigFromSolr.class); + private static VLogfeederFilterWrapper logfeederFilterWrapper = null; + private static int solrConfigInterval = 5;// 5 sec; + private static long delay; + private static String endTimeDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS";//2016-04-05T04:30:00.000Z + private static String sysTimeZone = "GMT"; + + public FetchConfigFromSolr() { + this.setName(this.getClass().getSimpleName()); + } + + @Override + public void run() { + solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.internal", solrConfigInterval); + delay = 1000 * solrConfigInterval; + do { + logger.debug("Updating config from solr after every " + solrConfigInterval + " sec."); + pullConfigFromSolr(); + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + logger.error(e.getLocalizedMessage(), e.getCause()); + } + } while (true); + } + + private synchronized void pullConfigFromSolr() { + HashMap configDocMap = SolrUtil.getInstance().getConfigDoc(); + if (configDocMap != null) { + String configJson = (String) configDocMap.get(LogFeederConstants.VALUES); + if (configJson != null) { + logfeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, VLogfeederFilterWrapper.class); + } + } + } + + public static boolean isFilterExpired(VLogfeederFilter logfeederFilter) { + boolean isFilterExpired = false;// default is false + if (logfeederFilter != null) { + Date filterEndDate = parseFilterExpireDate(logfeederFilter); + if (filterEndDate != null) { + Date currentDate = getCurrentDate(); + if (currentDate.compareTo(filterEndDate) >= 0) { + logger.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts :" + + listToStr(logfeederFilter.getHosts()) + "Filter is expired because of filter endTime : " + + dateToStr(filterEndDate) + " is older than currentTime :" + dateToStr(currentDate)); + isFilterExpired = true; + } + } + } + return isFilterExpired; + } + + public static String dateToStr(Date date) { + if (date == null) { + return ""; + } + SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat); + TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone); + formatter.setTimeZone(timeZone); + return formatter.format(date); + } + + public static Date parseFilterExpireDate(VLogfeederFilter vLogfeederFilter) { + String expiryTime = vLogfeederFilter.getExpiryTime(); + if (expiryTime != null && !expiryTime.isEmpty()) { + SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat); + TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone); + formatter.setTimeZone(timeZone); + try { + return formatter.parse(expiryTime); + } catch (ParseException e) { + logger.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel() + + " and hosts :" + listToStr(vLogfeederFilter.getHosts())); + } + } + return null; + } + + public static List getAllowedLevels(String hostName, VLogfeederFilter componentFilter) { + String componentName = componentFilter.getLabel(); + List hosts = componentFilter.getHosts(); + List defaultLevels = componentFilter.getDefaultLevels(); + List overrideLevels = componentFilter.getOverrideLevels(); + if (LogFeederUtil.isListContains(hosts, hostName, false)) { + if (isFilterExpired(componentFilter)) { + // pick default + logger.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " + + componentFilter.getExpiryTime()); + return defaultLevels; + } else { + // return tmp filter levels + return overrideLevels; + } + } else { + return defaultLevels; + } + } + + public static VLogfeederFilter findComponentFilter(String componentName) { + if (logfeederFilterWrapper != null) { + HashMap filter = logfeederFilterWrapper.getFilter(); + if (filter != null) { + VLogfeederFilter componentFilter = filter.get(componentName); + if (componentFilter != null) { + return componentFilter; + } + } + } + logger.trace("Filter is not there for component :" + componentName); + return null; + } + + + public static Date getCurrentDate() { + TimeZone.setDefault(TimeZone.getTimeZone(sysTimeZone)); + Date date = new Date(); + return date; + } + + public static String listToStr(List strList) { + StringBuilder out = new StringBuilder("["); + if (strList != null) { + int counter = 0; + for (Object o : strList) { + if (counter > 0) { + out.append(","); + } + out.append(o.toString()); + counter++; + } + } + out.append("]"); + return out.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java new file mode 100644 index 0000000..f61dc1b --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.logconfig; + +public class LogFeederConstants { + + public static final String ALL = "all"; + public static final String NAME = "log_feeder_config"; + // solr fields + public static final String SOLR_LEVEL = "level"; + public static final String SOLR_COMPONENT = "type"; + public static final String SOLR_HOST = "host"; + + // + // UserConfig Constants History + public static final String ID = "id"; + public static final String USER_NAME = "username"; + public static final String VALUES = "jsons"; + public static final String FILTER_NAME = "filtername"; + public static final String ROW_TYPE = "rowtype"; + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java new file mode 100644 index 0000000..7525dff --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.logconfig; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.log4j.Logger; + +public enum LogfeederScheduler { + + INSTANCE; + + private Logger logger = Logger.getLogger(LogfeederScheduler.class); + + private static boolean running = false; + + public synchronized void start() { + boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false); + if (!filterEnable) { + logger.info("Logfeeder filter Scheduler is disabled."); + return; + } + if (!running) { + for (Thread thread : getThreadList()) { + thread.start(); + } + running = true; + logger.info("Logfeeder Scheduler started!"); + } else { + logger.warn("Logfeeder Scheduler is already running."); + } + } + + private List getThreadList() { + List tasks = new ArrayList(); + tasks.add(new FetchConfigFromSolr()); + return tasks; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java new file mode 100644 index 0000000..3748445 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.logconfig.filter; + +import java.util.List; +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr; +import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; +import org.apache.ambari.logfeeder.view.VLogfeederFilter; +import org.apache.log4j.Logger; + +public class ApplyLogFilter extends DefaultDataFilter { + + private static Logger logger = Logger.getLogger(ApplyLogFilter.class); + + @Override + public boolean applyFilter(Map jsonObj, boolean defaultValue) { + if (isEmpty(jsonObj)) { + logger.warn("Output jsonobj is empty"); + return defaultValue; + } + String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST); + if (isNotEmpty(hostName)) { + String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT); + if (isNotEmpty(componentName)) { + String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL); + if (isNotEmpty(level)) { + // find component filter + VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName); + if (componentFilter == null) { + //return default value if there is no filter found for particular component + return defaultValue; + } + List allowedLevels = FetchConfigFromSolr.getAllowedLevels(hostName, componentFilter); + return LogFeederUtil.isListContains(allowedLevels, level, false); + } + } + } + return defaultValue; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java new file mode 100644 index 0000000..9e98c6a --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.logconfig.filter; + +import java.util.List; +import java.util.Map; + +import org.apache.log4j.Logger; + +/** + * Default filter to allow everything + */ +public class DefaultDataFilter { + private static Logger logger = Logger.getLogger(DefaultDataFilter.class); + + protected static final boolean CASE_SENSITIVE = false; + + public boolean applyFilter(Map outputJsonObj, boolean defaultValue) { + return defaultValue; + } + + public boolean isEmpty(Map map) { + if (map == null || map.isEmpty()) { + return true; + } + return false; + } + + public boolean isEmpty(String str) { + if (str == null || str.trim().isEmpty()) { + return true; + } + return false; + } + + public boolean isNotEmpty(String str) { + return !isEmpty(str); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java new file mode 100644 index 0000000..643df98 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.logconfig.filter; + +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter; +import org.apache.log4j.Logger; + +/** + * Read configuration from solr and filter the log + */ +public enum FilterLogData { + INSTANCE; + private ApplyLogFilter applyLogFilter = new ApplyLogFilter(); + private static Logger logger = Logger.getLogger(FilterLogData.class); + // by default allow every log + boolean defaultValue = true; + + public boolean isAllowed(String jsonBlock) { + if (jsonBlock == null || jsonBlock.isEmpty()) { + return defaultValue; + } + Map jsonObj = LogFeederUtil.toJSONObject(jsonBlock); + return applyLogFilter.applyFilter(jsonObj, defaultValue); + } + + public boolean isAllowed(Map jsonObj) { + boolean isAllowed = applyLogFilter.applyFilter(jsonObj, defaultValue); + if (!isAllowed) { + logger.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj)); + } + return isAllowed; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java new file mode 100644 index 0000000..5b89d4b --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.mapper; + +import java.util.Map; + +public abstract class Mapper { + String inputDesc; + String fieldName; + String mapClassCode; + + @SuppressWarnings("hiding") + public boolean init(String inputDesc, String fieldName, + String mapClassCode, Object mapConfigs) { + this.inputDesc = inputDesc; + this.fieldName = fieldName; + this.mapClassCode = mapClassCode; + return true; + } + + /** + * @param value + * @return + */ + public Object apply(Map jsonObj, Object value) { + return value; + } + + @Override + public String toString() { + return "mapClass=" + mapClassCode + ", input=" + inputDesc + + ", fieldName=" + fieldName; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java new file mode 100644 index 0000000..107e7e4 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.mapper; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class MapperDate extends Mapper { + Logger logger = Logger.getLogger(MapperDate.class); + + String dateFormat = null; + SimpleDateFormat dateFormatter = null; + boolean isEpoch = false; + + @SuppressWarnings("hiding") + @Override + public boolean init(String inputDesc, String fieldName, + String mapClassCode, Object mapConfigs) { + super.init(inputDesc, fieldName, mapClassCode, mapConfigs); + if (!(mapConfigs instanceof Map)) { + logger.fatal("Can't initialize object. mapConfigs class is not of type Map. " + + mapConfigs.getClass().getName() + + ", map=" + + this.toString()); + return false; + } + @SuppressWarnings("unchecked") + Map mapObjects = (Map) mapConfigs; + dateFormat = (String) mapObjects.get("date_pattern"); + if (StringUtils.isEmpty(dateFormat)) { + logger.fatal("Date format for map is empty. " + this.toString()); + } else { + logger.info("Date mapper format is " + dateFormat); + + if (dateFormat.equalsIgnoreCase("epoch")) { + isEpoch = true; + return true; + } else { + try { + dateFormatter = new SimpleDateFormat(dateFormat); + return true; + } catch (Throwable ex) { + logger.fatal("Error creating date format. format=" + + dateFormat + ". " + this.toString()); + } + } + } + return false; + } + + @Override + public Object apply(Map jsonObj, Object value) { + if (value != null) { + try { + if (isEpoch) { + // First convert to long + long ms = Long.parseLong(value.toString()) * 1000; + value = new Date(ms); + } else if (dateFormatter != null) { + value = dateFormatter.parse(value.toString()); + } else { + return value; + } + jsonObj.put(fieldName, value); + } catch (Throwable t) { + LogFeederUtil.logErrorMessageByInterval(this.getClass() + .getSimpleName() + ":apply", + "Error applying date transformation. isEpoch=" + + isEpoch + ", dateFormat=" + dateFormat + + ", value=" + value + ". " + this.toString(), + t, logger, Level.ERROR); + } + } + return value; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java new file mode 100644 index 0000000..99c33ed --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.mapper; + +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * Overrides the value for the field + */ +public class MapperFieldName extends Mapper { + Logger logger = Logger.getLogger(MapperFieldName.class); + String newValue = null; + + @SuppressWarnings("hiding") + @Override + public boolean init(String inputDesc, String fieldName, + String mapClassCode, Object mapConfigs) { + super.init(inputDesc, fieldName, mapClassCode, mapConfigs); + if (!(mapConfigs instanceof Map)) { + logger.fatal("Can't initialize object. mapConfigs class is not of type Map. " + + mapConfigs.getClass().getName()); + return false; + } + @SuppressWarnings("unchecked") + Map mapObjects = (Map) mapConfigs; + newValue = (String) mapObjects.get("new_fieldname"); + if (StringUtils.isEmpty(newValue)) { + logger.fatal("Map field value is empty."); + return false; + } + return true; + } + + @Override + public Object apply(Map jsonObj, Object value) { + if (newValue != null) { + // Remove the old one + jsonObj.remove(fieldName); + // Add with new key name + jsonObj.put(newValue, value); + } else { + LogFeederUtil.logErrorMessageByInterval(this.getClass() + .getSimpleName() + ":apply", + "New fieldName is null, so transformation is not applied. " + + this.toString(), null, logger, Level.ERROR); + } + return value; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/f7294694/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java new file mode 100644 index 0000000..9810ceb --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.mapper; + +import java.util.Map; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * Overrides the value for the field + */ +public class MapperFieldValue extends Mapper { + Logger logger = Logger.getLogger(MapperFieldValue.class); + String prevValue = null; + String newValue = null; + + @SuppressWarnings("hiding") + @Override + public boolean init(String inputDesc, String fieldName, + String mapClassCode, Object mapConfigs) { + super.init(inputDesc, fieldName, mapClassCode, mapConfigs); + if (!(mapConfigs instanceof Map)) { + logger.fatal("Can't initialize object. mapConfigs class is not of type Map. " + + mapConfigs.getClass().getName()); + return false; + } + @SuppressWarnings("unchecked") + Map mapObjects = (Map) mapConfigs; + prevValue = (String) mapObjects.get("pre_value"); + newValue = (String) mapObjects.get("post_value"); + if (StringUtils.isEmpty(newValue)) { + logger.fatal("Map field value is empty."); + return false; + } + return true; + } + + @Override + public Object apply(Map jsonObj, Object value) { + if (newValue != null) { + if (prevValue != null) { + if (prevValue.equalsIgnoreCase(value.toString())) { + value = newValue; + jsonObj.put(fieldName, value); + } + } + } else { + LogFeederUtil.logErrorMessageByInterval( + this.getClass().getSimpleName() + ":apply", + "New value is null, so transformation is not applied. " + + this.toString(), null, logger, Level.ERROR); + } + return value; + } + +}