Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 527C7200BB9 for ; Mon, 7 Nov 2016 14:05:24 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 51253160B1F; Mon, 7 Nov 2016 13:05:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 41AD2160B17 for ; Mon, 7 Nov 2016 14:05:21 +0100 (CET) Received: (qmail 98926 invoked by uid 500); 7 Nov 2016 13:05:19 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 98207 invoked by uid 99); 7 Nov 2016 13:05:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 13:05:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 238BDF12EF; Mon, 7 Nov 2016 13:05:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jonathanhurley@apache.org To: commits@ambari.apache.org Date: Mon, 07 Nov 2016 13:05:36 -0000 Message-Id: <14b06d84a72d46e5bd118952e1930f1b@git.apache.org> In-Reply-To: <461452b9403a44a7aad35c447defd190@git.apache.org> References: <461452b9403a44a7aad35c447defd190@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [19/60] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder (Miklos Gergely via oleewere) archived-at: Mon, 07 Nov 2016 13:05:24 -0000 http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java index 5feb9c4..e13d9bd 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java @@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.input; import java.io.File; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,88 +28,138 @@ import java.util.Map; import org.apache.ambari.logfeeder.common.ConfigBlock; import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.filter.Filter; -import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.metrics.MetricData; import org.apache.ambari.logfeeder.output.Output; -import org.apache.ambari.logfeeder.output.OutputMgr; +import org.apache.ambari.logfeeder.output.OutputManager; import org.apache.log4j.Logger; public abstract class Input extends ConfigBlock implements Runnable { - static private Logger logger = Logger.getLogger(Input.class); - - protected OutputMgr outputMgr; - protected InputMgr inputMgr; + private static final Logger LOG = Logger.getLogger(Input.class); + private static final boolean DEFAULT_TAIL = true; + private static final boolean DEFAULT_USE_EVENT_MD5 = false; + private static final boolean DEFAULT_GEN_EVENT_MD5 = true; + + protected InputManager inputManager; + protected OutputManager outputManager; private List outputList = new ArrayList(); - private Filter firstFilter = null; private Thread thread; - private boolean isClosed = false; - protected String filePath = null; - private String type = null; + private String type; + protected String filePath; + private Filter firstFilter; + private boolean isClosed; - protected boolean tail = true; - private boolean useEventMD5 = false; - private boolean genEventMD5 = true; + protected boolean tail; + private boolean useEventMD5; + private boolean genEventMD5; - protected MetricCount readBytesMetric = new MetricCount(); + protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false); + protected String getReadBytesMetricName() { + return null; + } + + @Override + public void loadConfig(Map map) { + super.loadConfig(map); + String typeValue = getStringValue("type"); + if (typeValue != null) { + // Explicitly add type and value to field list + contextFields.put("type", typeValue); + @SuppressWarnings("unchecked") + Map addFields = (Map) map.get("add_fields"); + if (addFields == null) { + addFields = new HashMap(); + map.put("add_fields", addFields); + } + addFields.put("type", typeValue); + } + } - /** - * This method will be called from the thread spawned for the output. This - * method should only exit after all data are read from the source or the - * process is exiting - */ - abstract void start() throws Exception; + public void setType(String type) { + this.type = type; + } + + public void setInputManager(InputManager inputManager) { + this.inputManager = inputManager; + } + + public void setOutputManager(OutputManager outputManager) { + this.outputManager = outputManager; + } + + public void addFilter(Filter filter) { + if (firstFilter == null) { + firstFilter = filter; + } else { + Filter f = firstFilter; + while (f.getNextFilter() != null) { + f = f.getNextFilter(); + } + f.setNextFilter(filter); + } + } + + public void addOutput(Output output) { + outputList.add(output); + } @Override public void init() throws Exception { super.init(); - tail = getBooleanValue("tail", tail); - useEventMD5 = getBooleanValue("use_event_md5_as_id", useEventMD5); - genEventMD5 = getBooleanValue("gen_event_md5", genEventMD5); + tail = getBooleanValue("tail", DEFAULT_TAIL); + useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5); + genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5); if (firstFilter != null) { firstFilter.init(); } } - @Override - public String getNameForThread() { - if (filePath != null) { - try { - return (type + "=" + (new File(filePath)).getName()); - } catch (Throwable ex) { - logger.warn("Couldn't get basename for filePath=" + filePath, - ex); - } + boolean monitor() { + if (isReady()) { + LOG.info("Starting thread. " + getShortDescription()); + thread = new Thread(this, getNameForThread()); + thread.start(); + return true; + } else { + return false; } - return super.getNameForThread() + ":" + type; } + public abstract boolean isReady(); + @Override public void run() { try { - logger.info("Started to monitor. " + getShortDescription()); + LOG.info("Started to monitor. " + getShortDescription()); start(); } catch (Exception e) { - logger.error("Error writing to output.", e); + LOG.error("Error writing to output.", e); } - logger.info("Exiting thread. " + getShortDescription()); + LOG.info("Exiting thread. " + getShortDescription()); } + /** + * This method will be called from the thread spawned for the output. This + * method should only exit after all data are read from the source or the + * process is exiting + */ + abstract void start() throws Exception; + protected void outputLine(String line, InputMarker marker) { - statMetric.count++; - readBytesMetric.count += (line.length()); + statMetric.value++; + readBytesMetric.value += (line.length()); if (firstFilter != null) { try { firstFilter.apply(line, marker); } catch (LogfeederException e) { - logger.error(e.getLocalizedMessage(),e); + LOG.error(e.getLocalizedMessage(), e); } } else { - // TODO: For now, let's make filter mandatory, so that no one - // accidently forgets to write filter - // outputMgr.write(line, this); + // TODO: For now, let's make filter mandatory, so that no one accidently forgets to write filter + // outputManager.write(line, this); } } @@ -120,60 +169,10 @@ public abstract class Input extends ConfigBlock implements Runnable { } } - public boolean monitor() { - if (isReady()) { - logger.info("Starting thread. " + getShortDescription()); - thread = new Thread(this, getNameForThread()); - thread.start(); - return true; - } else { - return false; - } - } - - public void checkIn(InputMarker inputMarker) { - // Default implementation is to ignore. - } - - /** - * This is generally used by final checkin - */ - public void checkIn() { - } - - public boolean isReady() { - return true; - } - - public boolean isTail() { - return tail; - } - - public void setTail(boolean tail) { - this.tail = tail; - } - - public boolean isUseEventMD5() { - return useEventMD5; - } - - public void setUseEventMD5(boolean useEventMD5) { - this.useEventMD5 = useEventMD5; - } - - public boolean isGenEventMD5() { - return genEventMD5; - } - - public void setGenEventMD5(boolean genEventMD5) { - this.genEventMD5 = genEventMD5; - } - @Override public void setDrain(boolean drain) { - logger.info("Request to drain. " + getShortDescription()); + LOG.info("Request to drain. " + getShortDescription()); super.setDrain(drain); - ; try { thread.interrupt(); } catch (Throwable t) { @@ -181,38 +180,36 @@ public abstract class Input extends ConfigBlock implements Runnable { } } - public Filter getFirstFilter() { - return firstFilter; - } - - public void setFirstFilter(Filter filter) { - firstFilter = filter; + public void addMetricsContainers(List metricsList) { + super.addMetricsContainers(metricsList); + if (firstFilter != null) { + firstFilter.addMetricsContainers(metricsList); + } + metricsList.add(readBytesMetric); } - public void setInputMgr(InputMgr inputMgr) { - this.inputMgr = inputMgr; - } + @Override + public void logStat() { + super.logStat(); + logStatForMetric(readBytesMetric, "Stat: Bytes Read"); - public void setOutputMgr(OutputMgr outputMgr) { - this.outputMgr = outputMgr; + if (firstFilter != null) { + firstFilter.logStat(); + } } - public String getFilePath() { - return filePath; - } + public abstract void checkIn(InputMarker inputMarker); - public void setFilePath(String filePath) { - this.filePath = filePath; - } + public abstract void lastCheckIn(); public void close() { - logger.info("Close called. " + getShortDescription()); + LOG.info("Close called. " + getShortDescription()); try { if (firstFilter != null) { firstFilter.close(); } else { - outputMgr.close(); + outputManager.close(); } } catch (Throwable t) { // Ignore @@ -220,86 +217,60 @@ public abstract class Input extends ConfigBlock implements Runnable { isClosed = true; } - public void setClosed(boolean isClosed) { - this.isClosed = isClosed; - } - - public boolean isClosed() { - return isClosed; - } - - @Override - public void loadConfig(Map map) { - super.loadConfig(map); - String typeValue = getStringValue("type"); - if (typeValue != null) { - // Explicitly add type and value to field list - contextFields.put("type", typeValue); - @SuppressWarnings("unchecked") - Map addFields = (Map) map - .get("add_fields"); - if (addFields == null) { - addFields = new HashMap(); - map.put("add_fields", addFields); - } - addFields.put("type", typeValue); - } + public boolean isTail() { + return tail; } - @Override - public String getShortDescription() { - return null; + public boolean isUseEventMD5() { + return useEventMD5; } - @Override - public void logStat() { - super.logStat(); - logStatForMetric(readBytesMetric, "Stat: Bytes Read"); - - if (firstFilter != null) { - firstFilter.logStat(); - } + public boolean isGenEventMD5() { + return genEventMD5; } - @Override - public String toString() { - return getShortDescription(); + public Filter getFirstFilter() { + return firstFilter; } - public void rollOver() { - // Only some inputs support it. E.g. InputFile + public String getFilePath() { + return filePath; } - public String getType() { - return type; + public void setFilePath(String filePath) { + this.filePath = filePath; } - public void setType(String type) { - this.type = type; + public void setClosed(boolean isClosed) { + this.isClosed = isClosed; } - public Date getEventTime() { - return null; + public boolean isClosed() { + return isClosed; } public List getOutputList() { return outputList; } - - public void addOutput(Output output) { - outputList.add(output); - } - - public void addMetricsContainers(List metricsList) { - super.addMetricsContainers(metricsList); - if (firstFilter != null) { - firstFilter.addMetricsContainers(metricsList); - } - metricsList.add(readBytesMetric); - } public Thread getThread(){ return thread; } + @Override + public String getNameForThread() { + if (filePath != null) { + try { + return (type + "=" + (new File(filePath)).getName()); + } catch (Throwable ex) { + LOG.warn("Couldn't get basename for filePath=" + filePath, ex); + } + } + return super.getNameForThread() + ":" + type; + } + + @Override + public String toString() { + return getShortDescription(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java index c9f5ded..3737839 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java @@ -19,528 +19,99 @@ package org.apache.ambari.logfeeder.input; import java.io.BufferedReader; -import java.io.EOFException; import java.io.File; import java.io.FileFilter; import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory; -import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.ambari.logfeeder.util.FileUtil; import org.apache.commons.io.filefilter.WildcardFileFilter; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.apache.commons.lang3.ArrayUtils; import org.apache.solr.common.util.Base64; -public class InputFile extends Input { - private static final Logger logger = Logger.getLogger(InputFile.class); - - private String logPath = null; - private boolean isStartFromBegining = true; - - private boolean isReady = false; - private File[] logPathFiles = null; - private Object fileKey = null; - private String base64FileKey = null; - - private boolean isRolledOver = false; - private boolean addWildCard = false; - - private long lastCheckPointTimeMS = 0; - private int checkPointIntervalMS = 5 * 1000; // 5 seconds - private RandomAccessFile checkPointWriter = null; - private Map jsonCheckPoint = null; - - private File checkPointFile = null; - - private InputMarker lastCheckPointInputMarker = null; - - private String checkPointExtension = ".cp"; - - @Override - public void init() throws Exception { - logger.info("init() called"); - statMetric.metricsName = "input.files.read_lines"; - readBytesMetric.metricsName = "input.files.read_bytes"; - checkPointExtension = LogFeederUtil.getStringProperty( - "logfeeder.checkpoint.extension", checkPointExtension); - - // Let's close the file and set it to true after we start monitoring it - setClosed(true); - logPath = getStringValue("path"); - tail = getBooleanValue("tail", tail); - addWildCard = getBooleanValue("add_wild_card", addWildCard); - checkPointIntervalMS = getIntValue("checkpoint.interval.ms", - checkPointIntervalMS); - - if (logPath == null || logPath.isEmpty()) { - logger.error("path is empty for file input. " - + getShortDescription()); - return; - } - - String startPosition = getStringValue("start_position"); - if (StringUtils.isEmpty(startPosition) - || startPosition.equalsIgnoreCase("beginning") - || startPosition.equalsIgnoreCase("begining")) { - isStartFromBegining = true; - } - - if (!tail) { - // start position end doesn't apply if we are not tailing - isStartFromBegining = true; - } - - setFilePath(logPath); - boolean isFileReady = isReady(); - - logger.info("File to monitor " + logPath + ", tail=" + tail - + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady); - - super.init(); - } +public class InputFile extends AbstractInputFile { @Override public boolean isReady() { if (!isReady) { // Let's try to check whether the file is available - logPathFiles = getActualFiles(logPath); - if (logPathFiles != null && logPathFiles.length > 0 - && logPathFiles[0].isFile()) { - - if (isTail() && logPathFiles.length > 1) { - logger.warn("Found multiple files (" + logPathFiles.length - + ") for the file filter " + filePath - + ". Will use only the first one. Using " - + logPathFiles[0].getAbsolutePath()); + logFiles = getActualFiles(logPath); + if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) { + if (tail && logFiles.length > 1) { + LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath + + ". Will use only the first one. Using " + logFiles[0].getAbsolutePath()); } - logger.info("File filter " + filePath + " expanded to " - + logPathFiles[0].getAbsolutePath()); + LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath()); isReady = true; } else { - logger.debug(logPath + " file doesn't exist. Ignoring for now"); + LOG.debug(logPath + " file doesn't exist. Ignoring for now"); } } return isReady; } private File[] getActualFiles(String searchPath) { - if (addWildCard) { - if (!searchPath.endsWith("*")) { - searchPath = searchPath + "*"; - } - } - File checkFile = new File(searchPath); - if (checkFile.isFile()) { - return new File[]{checkFile}; + File searchFile = new File(searchPath); + if (searchFile.isFile()) { + return new File[]{searchFile}; + } else { + FileFilter fileFilter = new WildcardFileFilter(searchFile.getName()); + return searchFile.getParentFile().listFiles(fileFilter); } - // Let's do wild card search - // First check current folder - File checkFiles[] = findFileForWildCard(searchPath, new File(".")); - if (checkFiles == null || checkFiles.length == 0) { - // Let's check from the parent folder - File parentDir = (new File(searchPath)).getParentFile(); - if (parentDir != null) { - String wildCard = (new File(searchPath)).getName(); - checkFiles = findFileForWildCard(wildCard, parentDir); - } - } - return checkFiles; - } - - private File[] findFileForWildCard(String searchPath, File dir) { - logger.debug("findFileForWildCard(). filePath=" + searchPath + ", dir=" - + dir + ", dir.fullpath=" + dir.getAbsolutePath()); - FileFilter fileFilter = new WildcardFileFilter(searchPath); - return dir.listFiles(fileFilter); - } - - @Override - synchronized public void checkIn(InputMarker inputMarker) { - super.checkIn(inputMarker); - if (checkPointWriter != null) { - try { - int lineNumber = LogFeederUtil.objectToInt( - jsonCheckPoint.get("line_number"), 0, "line_number"); - if (lineNumber > inputMarker.lineNumber) { - // Already wrote higher line number for this input - return; - } - // If interval is greater than last checkPoint time, then write - long currMS = System.currentTimeMillis(); - if (!isClosed() - && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { - // Let's save this one so we can update the check point file - // on flush - lastCheckPointInputMarker = inputMarker; - return; - } - lastCheckPointTimeMS = currMS; - - jsonCheckPoint.put("line_number", "" - + new Integer(inputMarker.lineNumber)); - jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); - jsonCheckPoint.put("last_write_time_date", new Date()); - - String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); - - // Let's rewind - checkPointWriter.seek(0); - checkPointWriter.writeInt(jsonStr.length()); - checkPointWriter.write(jsonStr.getBytes()); - - if (isClosed()) { - final String LOG_MESSAGE_KEY = this.getClass() - .getSimpleName() + "_FINAL_CHECKIN"; - LogFeederUtil.logErrorMessageByInterval( - LOG_MESSAGE_KEY, - "Wrote final checkPoint, input=" - + getShortDescription() - + ", checkPointFile=" - + checkPointFile.getAbsolutePath() - + ", checkPoint=" + jsonStr, null, logger, - Level.INFO); - } - } catch (Throwable t) { - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_CHECKIN_EXCEPTION"; - LogFeederUtil - .logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Caught exception checkIn. , input=" - + getShortDescription(), t, logger, - Level.ERROR); - } - } - - } - - @Override - public void checkIn() { - super.checkIn(); - if (lastCheckPointInputMarker != null) { - checkIn(lastCheckPointInputMarker); - } - } - - @Override - public void rollOver() { - logger.info("Marking this input file for rollover. " - + getShortDescription()); - isRolledOver = true; } @Override void start() throws Exception { - - if (logPathFiles == null || logPathFiles.length == 0) { - return; - } boolean isProcessFile = getBooleanValue("process_file", true); if (isProcessFile) { - if (isTail()) { - processFile(logPathFiles[0]); + if (tail) { + processFile(logFiles[0]); } else { - for (File file : logPathFiles) { + for (File file : logFiles) { try { processFile(file); if (isClosed() || isDrain()) { - logger.info("isClosed or isDrain. Now breaking loop."); + LOG.info("isClosed or isDrain. Now breaking loop."); break; } } catch (Throwable t) { - logger.error("Error processing file=" + file.getAbsolutePath(), t); + LOG.error("Error processing file=" + file.getAbsolutePath(), t); } } } close(); - }else{ - copyFiles(logPathFiles); + } else { + copyFiles(logFiles); } - } @Override - public void close() { - super.close(); - logger.info("close() calling checkPoint checkIn(). " - + getShortDescription()); - checkIn(); - } - - private void processFile(File logPathFile) throws FileNotFoundException, - IOException { - logger.info("Monitoring logPath=" + logPath + ", logPathFile=" - + logPathFile); - BufferedReader br = null; - checkPointFile = null; - checkPointWriter = null; - jsonCheckPoint = null; - int resumeFromLineNumber = 0; - - int lineCount = 0; - try { - setFilePath(logPathFile.getAbsolutePath()); - br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile)); - - // Whether to send to output from the beginning. - boolean resume = isStartFromBegining; - - // Seems FileWatch is not reliable, so let's only use file key comparison - fileKey = getFileKey(logPathFile); - base64FileKey = Base64.byteArrayToBase64(fileKey.toString() - .getBytes()); - logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey - + ". " + getShortDescription()); - - if (isTail()) { - try { - logger.info("Checking existing checkpoint file. " - + getShortDescription()); - - String fileBase64 = Base64.byteArrayToBase64(fileKey - .toString().getBytes()); - String checkPointFileName = fileBase64 - + checkPointExtension; - File checkPointFolder = inputMgr.getCheckPointFolderFile(); - checkPointFile = new File(checkPointFolder, - checkPointFileName); - checkPointWriter = new RandomAccessFile(checkPointFile, - "rw"); - - try { - int contentSize = checkPointWriter.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointWriter.read(b, 0, contentSize); - if (readSize != contentSize) { - logger.error("Couldn't read expected number of bytes from checkpoint file. expected=" - + contentSize - + ", read=" - + readSize - + ", checkPointFile=" - + checkPointFile - + ", input=" + getShortDescription()); - } else { - String jsonCheckPointStr = new String(b, 0, readSize); - jsonCheckPoint = LogFeederUtil - .toJSONObject(jsonCheckPointStr); - - resumeFromLineNumber = LogFeederUtil.objectToInt( - jsonCheckPoint.get("line_number"), 0, - "line_number"); - - if (resumeFromLineNumber > 0) { - // Let's read from last line read - resume = false; - } - logger.info("CheckPoint. checkPointFile=" - + checkPointFile + ", json=" - + jsonCheckPointStr - + ", resumeFromLineNumber=" - + resumeFromLineNumber + ", resume=" - + resume); - } - } catch (EOFException eofEx) { - logger.info("EOFException. Will reset checkpoint file " - + checkPointFile.getAbsolutePath() + " for " - + getShortDescription()); - } - if (jsonCheckPoint == null) { - // This seems to be first time, so creating the initial - // checkPoint object - jsonCheckPoint = new HashMap(); - jsonCheckPoint.put("file_path", filePath); - jsonCheckPoint.put("file_key", fileBase64); - } - - } catch (Throwable t) { - logger.error( - "Error while configuring checkpoint file. Will reset file. checkPointFile=" - + checkPointFile, t); - } - } - - setClosed(false); - int sleepStep = 2; - int sleepIteration = 0; - while (true) { - try { - if (isDrain()) { - break; - } - - String line = br.readLine(); - if (line == null) { - if (!resume) { - resume = true; - } - sleepIteration++; - try { - // Since FileWatch service is not reliable, we will check - // file inode every n seconds after no write - if (sleepIteration > 4) { - Object newFileKey = getFileKey(logPathFile); - if (newFileKey != null) { - if (fileKey == null - || !newFileKey.equals(fileKey)) { - logger.info("File key is different. Calling rollover. oldKey=" - + fileKey - + ", newKey=" - + newFileKey - + ". " - + getShortDescription()); - // File has rotated. - rollOver(); - } - } - } - // Flush on the second iteration - if (!tail && sleepIteration >= 2) { - logger.info("End of file. Done with filePath=" - + logPathFile.getAbsolutePath() - + ", lineCount=" + lineCount); - flush(); - break; - } else if (sleepIteration == 2) { - flush(); - } else if (sleepIteration >= 2) { - if (isRolledOver) { - isRolledOver = false; - // Close existing file - try { - logger.info("File is rolled over. Closing current open file." - + getShortDescription() - + ", lineCount=" + lineCount); - br.close(); - } catch (Exception ex) { - logger.error("Error closing file" - + getShortDescription()); - break; - } - try { - logger.info("Opening new rolled over file." - + getShortDescription()); - br = new BufferedReader(LogsearchReaderFactory. - INSTANCE.getReader(logPathFile)); - lineCount = 0; - fileKey = getFileKey(logPathFile); - base64FileKey = Base64 - .byteArrayToBase64(fileKey - .toString().getBytes()); - logger.info("fileKey=" + fileKey - + ", base64=" + base64FileKey - + ", " + getShortDescription()); - } catch (Exception ex) { - logger.error("Error opening rolled over file. " - + getShortDescription()); - // Let's add this to monitoring and exit this thread - logger.info("Added input to not ready list." - + getShortDescription()); - isReady = false; - inputMgr.addToNotReady(this); - break; - } - logger.info("File is successfully rolled over. " - + getShortDescription()); - continue; - } - } - Thread.sleep(sleepStep * 1000); - sleepStep = (sleepStep * 2); - sleepStep = sleepStep > 10 ? 10 : sleepStep; - } catch (InterruptedException e) { - logger.info("Thread interrupted." - + getShortDescription()); - } - } else { - lineCount++; - sleepStep = 1; - sleepIteration = 0; - - if (!resume && lineCount > resumeFromLineNumber) { - logger.info("Resuming to read from last line. lineCount=" - + lineCount - + ", input=" - + getShortDescription()); - resume = true; - } - if (resume) { - InputMarker marker = new InputMarker(); - marker.base64FileKey = base64FileKey; - marker.input = this; - marker.lineNumber = lineCount; - outputLine(line, marker); - } - } - } catch (Throwable t) { - final String LOG_MESSAGE_KEY = this.getClass() - .getSimpleName() + "_READ_LOOP_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Caught exception in read loop. lineNumber=" - + lineCount + ", input=" - + getShortDescription(), t, logger, - Level.ERROR); - - } - } - } finally { - if (br != null) { - logger.info("Closing reader." + getShortDescription() - + ", lineCount=" + lineCount); - try { - br.close(); - } catch (Throwable t) { - // ignore - } - } - } - } - - static public Object getFileKey(File file) { - try { - Path fileFullPath = Paths.get(file.getAbsolutePath()); - if (fileFullPath != null) { - BasicFileAttributes basicAttr = Files.readAttributes( - fileFullPath, BasicFileAttributes.class); - return basicAttr.fileKey(); - } - } catch (Throwable ex) { - logger.error("Error getting file attributes for file=" + file, ex); - } - return file.toString(); + protected BufferedReader openLogFile(File logFile) throws FileNotFoundException { + BufferedReader br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logFile)); + fileKey = getFileKey(logFile); + base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes()); + LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription()); + return br; } @Override - public String getShortDescription() { - return "input:source=" - + getStringValue("source") - + ", path=" - + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0] - .getAbsolutePath() : getStringValue("path")); + protected Object getFileKey(File logFile) { + return FileUtil.getFileKey(logFile); } - - public void copyFiles(File[] files) { + + private void copyFiles(File[] files) { boolean isCopyFile = getBooleanValue("copy_file", false); if (isCopyFile && files != null) { for (File file : files) { try { - InputMarker marker = new InputMarker(); - marker.input = this; - outputMgr.copyFile(file, marker); + InputMarker marker = new InputMarker(this, null, 0); + outputManager.copyFile(file, marker); if (isClosed() || isDrain()) { - logger.info("isClosed or isDrain. Now breaking loop."); + LOG.info("isClosed or isDrain. Now breaking loop."); break; } } catch (Throwable t) { - logger.error("Error processing file=" + file.getAbsolutePath(), t); + LOG.error("Error processing file=" + file.getAbsolutePath(), t); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java new file mode 100644 index 0000000..8e70850 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.input; + +import java.io.EOFException; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import org.apache.ambari.logfeeder.metrics.MetricData; +import org.apache.ambari.logfeeder.util.FileUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; +import org.apache.commons.io.filefilter.WildcardFileFilter; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Logger; +import org.apache.solr.common.util.Base64; + +public class InputManager { + private static final Logger LOG = Logger.getLogger(InputManager.class); + + private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints"; + public static final String DEFAULT_CHECKPOINT_EXTENSION = ".cp"; + + private List inputList = new ArrayList(); + private Set notReadyList = new HashSet(); + + private boolean isDrain = false; + private boolean isAnyInputTail = false; + + private File checkPointFolderFile = null; + + private MetricData filesCountMetric = new MetricData("input.files.count", true); + + private String checkPointExtension; + + private Thread inputIsReadyMonitor = null; + + public List getInputList() { + return inputList; + } + + public void add(Input input) { + inputList.add(input); + } + + public void removeInput(Input input) { + LOG.info("Trying to remove from inputList. " + input.getShortDescription()); + Iterator iter = inputList.iterator(); + while (iter.hasNext()) { + Input iterInput = iter.next(); + if (iterInput.equals(input)) { + LOG.info("Removing Input from inputList. " + input.getShortDescription()); + iter.remove(); + } + } + } + + private int getActiveFilesCount() { + int count = 0; + for (Input input : inputList) { + if (input.isReady()) { + count++; + } + } + return count; + } + + public void init() { + checkPointExtension = LogFeederUtil.getStringProperty("logfeeder.checkpoint.extension", DEFAULT_CHECKPOINT_EXTENSION); + for (Input input : inputList) { + try { + input.init(); + if (input.isTail()) { + isAnyInputTail = true; + } + } catch (Exception e) { + LOG.error("Error initializing input. " + input.getShortDescription(), e); + } + } + + if (isAnyInputTail) { + LOG.info("Determining valid checkpoint folder"); + boolean isCheckPointFolderValid = false; + // We need to keep track of the files we are reading. + String checkPointFolder = LogFeederUtil.getStringProperty("logfeeder.checkpoint.folder"); + if (!StringUtils.isEmpty(checkPointFolder)) { + checkPointFolderFile = new File(checkPointFolder); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + } + if (!isCheckPointFolderValid) { + // Let's try home folder + String userHome = LogFeederUtil.getStringProperty("user.home"); + if (userHome != null) { + checkPointFolderFile = new File(userHome, CHECKPOINT_SUBFOLDER_NAME); + LOG.info("Checking if home folder can be used for checkpoints. Folder=" + checkPointFolderFile); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + } + } + if (!isCheckPointFolderValid) { + // Let's use tmp folder + String tmpFolder = LogFeederUtil.getStringProperty("java.io.tmpdir"); + if (tmpFolder == null) { + tmpFolder = "/tmp"; + } + checkPointFolderFile = new File(tmpFolder, CHECKPOINT_SUBFOLDER_NAME); + LOG.info("Checking if tmps folder can be used for checkpoints. Folder=" + checkPointFolderFile); + isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); + if (isCheckPointFolderValid) { + LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." + + "Please set logfeeder.checkpoint.folder property"); + } + } + + if (isCheckPointFolderValid) { + LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints"); + } + } + + } + + private boolean verifyCheckPointFolder(File folderPathFile) { + if (!folderPathFile.exists()) { + try { + if (!folderPathFile.mkdir()) { + LOG.warn("Error creating folder for check point. folder=" + folderPathFile); + } + } catch (Throwable t) { + LOG.warn("Error creating folder for check point. folder=" + folderPathFile, t); + } + } + + if (folderPathFile.exists() && folderPathFile.isDirectory()) { + // Let's check whether we can create a file + File testFile = new File(folderPathFile, UUID.randomUUID().toString()); + try { + testFile.createNewFile(); + return testFile.delete(); + } catch (IOException e) { + LOG.warn("Couldn't create test file in " + folderPathFile.getAbsolutePath() + " for checkPoint", e); + } + } + return false; + } + + public File getCheckPointFolderFile() { + return checkPointFolderFile; + } + + public void monitor() { + for (Input input : inputList) { + if (input.isReady()) { + input.monitor(); + } else { + if (input.isTail()) { + LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " + + "So it might not be an issue. " + input.getShortDescription()); + notReadyList.add(input); + } else { + LOG.info("Input is not ready, so going to ignore it " + input.getShortDescription()); + } + } + } + // Start the monitoring thread if any file is in tail mode + if (isAnyInputTail) { + inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { + @Override + public void run() { + LOG.info("Going to monitor for these missing files: " + notReadyList.toString()); + while (true) { + if (isDrain) { + LOG.info("Exiting missing file monitor."); + break; + } + try { + Iterator iter = notReadyList.iterator(); + while (iter.hasNext()) { + Input input = iter.next(); + try { + if (input.isReady()) { + input.monitor(); + iter.remove(); + } + } catch (Throwable t) { + LOG.error("Error while enabling monitoring for input. " + input.getShortDescription()); + } + } + Thread.sleep(30 * 1000); + } catch (Throwable t) { + // Ignore + } + } + } + }; + inputIsReadyMonitor.start(); + } + } + + void addToNotReady(Input notReadyInput) { + notReadyList.add(notReadyInput); + } + + public void addMetricsContainers(List metricsList) { + for (Input input : inputList) { + input.addMetricsContainers(metricsList); + } + filesCountMetric.value = getActiveFilesCount(); + metricsList.add(filesCountMetric); + } + + public void logStats() { + for (Input input : inputList) { + input.logStat(); + } + + filesCountMetric.value = getActiveFilesCount(); + LogFeederUtil.logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", ""); + } + + + public void cleanCheckPointFiles() { + + if (checkPointFolderFile == null) { + LOG.info("Will not clean checkPoint files. checkPointFolderFile=" + checkPointFolderFile); + return; + } + LOG.info("Cleaning checkPoint files. checkPointFolderFile=" + checkPointFolderFile.getAbsolutePath()); + try { + // Loop over the check point files and if filePath is not present, then move to closed + String searchPath = "*" + checkPointExtension; + FileFilter fileFilter = new WildcardFileFilter(searchPath); + File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter); + int totalCheckFilesDeleted = 0; + for (File checkPointFile : checkPointFiles) { + try (RandomAccessFile checkPointReader = new RandomAccessFile(checkPointFile, "r")) { + int contentSize = checkPointReader.readInt(); + byte b[] = new byte[contentSize]; + int readSize = checkPointReader.read(b, 0, contentSize); + if (readSize != contentSize) { + LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" + + readSize + ", checkPointFile=" + checkPointFile); + } else { + String jsonCheckPointStr = new String(b, 0, readSize); + Map jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); + + String logFilePath = (String) jsonCheckPoint.get("file_path"); + String logFileKey = (String) jsonCheckPoint.get("file_key"); + if (logFilePath != null && logFileKey != null) { + boolean deleteCheckPointFile = false; + File logFile = new File(logFilePath); + if (logFile.exists()) { + Object fileKeyObj = FileUtil.getFileKey(logFile); + String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes()); + if (!logFileKey.equals(fileBase64)) { + deleteCheckPointFile = true; + LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" + + logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath()); + } + } else { + LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" + + checkPointFile.getAbsolutePath()); + deleteCheckPointFile = true; + } + if (deleteCheckPointFile) { + LOG.info("Deleting CheckPoint file=" + checkPointFile.getAbsolutePath() + ", logFile=" + logFilePath); + checkPointFile.delete(); + totalCheckFilesDeleted++; + } + } + } + } catch (EOFException eof) { + LOG.warn("Caught EOFException. Ignoring reading existing checkPoint file. " + checkPointFile); + } catch (Throwable t) { + LOG.error("Error while checking checkPoint file. " + checkPointFile, t); + } + } + LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" + + checkPointFolderFile.getAbsolutePath()); + + } catch (Throwable t) { + LOG.error("Error while cleaning checkPointFiles", t); + } + } + + public void waitOnAllInputs() { + //wait on inputs + for (Input input : inputList) { + if (input != null) { + Thread inputThread = input.getThread(); + if (inputThread != null) { + try { + inputThread.join(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + // wait on monitor + if (inputIsReadyMonitor != null) { + try { + this.close(); + inputIsReadyMonitor.join(); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void checkInAll() { + for (Input input : inputList) { + input.lastCheckIn(); + } + } + + public void close() { + for (Input input : inputList) { + try { + input.setDrain(true); + } catch (Throwable t) { + LOG.error("Error while draining. input=" + input.getShortDescription(), t); + } + } + isDrain = true; + + // Need to get this value from property + int iterations = 30; + int waitTimeMS = 1000; + for (int i = 0; i < iterations; i++) { + boolean allClosed = true; + for (Input input : inputList) { + if (!input.isClosed()) { + try { + allClosed = false; + LOG.warn("Waiting for input to close. " + input.getShortDescription() + ", " + (iterations - i) + " more seconds"); + Thread.sleep(waitTimeMS); + } catch (Throwable t) { + // Ignore + } + } + } + if (allClosed) { + LOG.info("All inputs are closed. Iterations=" + i); + return; + } + } + + LOG.warn("Some inputs were not closed after " + iterations + " iterations"); + for (Input input : inputList) { + if (!input.isClosed()) { + LOG.warn("Input not closed. Will ignore it." + input.getShortDescription()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java index 48a7f1d..6767687 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java @@ -23,13 +23,18 @@ package org.apache.ambari.logfeeder.input; * This file contains the file inode, line number of the log currently been read */ public class InputMarker { - public int lineNumber = 0; - public Input input; - public String base64FileKey = null; - + public final Input input; + public final String base64FileKey; + public final int lineNumber; + + public InputMarker(Input input, String base64FileKey, int lineNumber) { + this.input = input; + this.base64FileKey = base64FileKey; + this.lineNumber = lineNumber; + } + @Override public String toString() { - return "InputMarker [lineNumber=" + lineNumber + ", input=" - + input.getShortDescription() + "]"; + return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]"; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java deleted file mode 100644 index b18c9b0..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ambari.logfeeder.input; - -import java.io.EOFException; -import java.io.File; -import java.io.FileFilter; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import org.apache.ambari.logfeeder.metrics.MetricCount; -import org.apache.ambari.logfeeder.util.LogFeederUtil; -import org.apache.commons.io.filefilter.WildcardFileFilter; -import org.apache.log4j.Logger; -import org.apache.solr.common.util.Base64; - -public class InputMgr { - private static final Logger logger = Logger.getLogger(InputMgr.class); - - private List inputList = new ArrayList(); - private Set notReadyList = new HashSet(); - - private boolean isDrain = false; - private boolean isAnyInputTail = false; - - private String checkPointSubFolderName = "logfeeder_checkpoints"; - private File checkPointFolderFile = null; - - private MetricCount filesCountMetric = new MetricCount(); - - private String checkPointExtension = ".cp"; - - private Thread inputIsReadyMonitor = null; - - public List getInputList() { - return inputList; - } - - public void add(Input input) { - inputList.add(input); - } - - public void removeInput(Input input) { - logger.info("Trying to remove from inputList. " - + input.getShortDescription()); - Iterator iter = inputList.iterator(); - while (iter.hasNext()) { - Input iterInput = iter.next(); - if (iterInput.equals(input)) { - logger.info("Removing Input from inputList. " - + input.getShortDescription()); - iter.remove(); - } - } - } - - public int getActiveFilesCount() { - int count = 0; - for (Input input : inputList) { - if (input.isReady()) { - count++; - } - } - return count; - } - - public void init() { - filesCountMetric.metricsName = "input.files.count"; - filesCountMetric.isPointInTime = true; - - checkPointExtension = LogFeederUtil.getStringProperty( - "logfeeder.checkpoint.extension", checkPointExtension); - for (Input input : inputList) { - try { - input.init(); - if (input.isTail()) { - isAnyInputTail = true; - } - } catch (Exception e) { - logger.error( - "Error initializing input. " - + input.getShortDescription(), e); - } - } - - if (isAnyInputTail) { - logger.info("Determining valid checkpoint folder"); - boolean isCheckPointFolderValid = false; - // We need to keep track of the files we are reading. - String checkPointFolder = LogFeederUtil - .getStringProperty("logfeeder.checkpoint.folder"); - if (checkPointFolder != null && !checkPointFolder.isEmpty()) { - checkPointFolderFile = new File(checkPointFolder); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - } - if (!isCheckPointFolderValid) { - // Let's try home folder - String userHome = LogFeederUtil.getStringProperty("user.home"); - if (userHome != null) { - checkPointFolderFile = new File(userHome, - checkPointSubFolderName); - logger.info("Checking if home folder can be used for checkpoints. Folder=" - + checkPointFolderFile); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - } - } - if (!isCheckPointFolderValid) { - // Let's use tmp folder - String tmpFolder = LogFeederUtil - .getStringProperty("java.io.tmpdir"); - if (tmpFolder == null) { - tmpFolder = "/tmp"; - } - checkPointFolderFile = new File(tmpFolder, - checkPointSubFolderName); - logger.info("Checking if tmps folder can be used for checkpoints. Folder=" - + checkPointFolderFile); - isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile); - if (isCheckPointFolderValid) { - logger.warn("Using tmp folder " - + checkPointFolderFile - + " to store check points. This is not recommended." - + "Please set logfeeder.checkpoint.folder property"); - } - } - - if (isCheckPointFolderValid) { - logger.info("Using folder " + checkPointFolderFile - + " for storing checkpoints"); - } - } - - } - - public File getCheckPointFolderFile() { - return checkPointFolderFile; - } - - private boolean verifyCheckPointFolder(File folderPathFile) { - if (!folderPathFile.exists()) { - // Create the folder - try { - if (!folderPathFile.mkdir()) { - logger.warn("Error creating folder for check point. folder=" - + folderPathFile); - } - } catch (Throwable t) { - logger.warn("Error creating folder for check point. folder=" - + folderPathFile, t); - } - } - - if (folderPathFile.exists() && folderPathFile.isDirectory()) { - // Let's check whether we can create a file - File testFile = new File(folderPathFile, UUID.randomUUID() - .toString()); - try { - testFile.createNewFile(); - return testFile.delete(); - } catch (IOException e) { - logger.warn( - "Couldn't create test file in " - + folderPathFile.getAbsolutePath() - + " for checkPoint", e); - } - } - return false; - } - - public void monitor() { - for (Input input : inputList) { - if (input.isReady()) { - input.monitor(); - } else { - if (input.isTail()) { - logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. " - + input.getShortDescription()); - notReadyList.add(input); - } else { - logger.info("Input is not ready, so going to ignore it " - + input.getShortDescription()); - } - } - } - // Start the monitoring thread if any file is in tail mode - if (isAnyInputTail) { - inputIsReadyMonitor = new Thread("InputIsReadyMonitor") { - @Override - public void run() { - logger.info("Going to monitor for these missing files: " - + notReadyList.toString()); - while (true) { - if (isDrain) { - logger.info("Exiting missing file monitor."); - break; - } - try { - Iterator iter = notReadyList.iterator(); - while (iter.hasNext()) { - Input input = iter.next(); - try { - if (input.isReady()) { - input.monitor(); - iter.remove(); - } - } catch (Throwable t) { - logger.error("Error while enabling monitoring for input. " - + input.getShortDescription()); - } - } - Thread.sleep(30 * 1000); - } catch (Throwable t) { - // Ignore - } - } - } - }; - inputIsReadyMonitor.start(); - } - } - - public void addToNotReady(Input notReadyInput) { - notReadyList.add(notReadyInput); - } - - public void addMetricsContainers(List metricsList) { - for (Input input : inputList) { - input.addMetricsContainers(metricsList); - } - filesCountMetric.count = getActiveFilesCount(); - metricsList.add(filesCountMetric); - } - - public void logStats() { - for (Input input : inputList) { - input.logStat(); - } - - filesCountMetric.count = getActiveFilesCount(); - LogFeederUtil.logStatForMetric(filesCountMetric, - "Stat: Files Monitored Count", null); - } - - public void close() { - for (Input input : inputList) { - try { - input.setDrain(true); - } catch (Throwable t) { - logger.error( - "Error while draining. input=" - + input.getShortDescription(), t); - } - } - isDrain = true; - - // Need to get this value from property - int iterations = 30; - int waitTimeMS = 1000; - int i = 0; - boolean allClosed = true; - for (i = 0; i < iterations; i++) { - allClosed = true; - for (Input input : inputList) { - if (!input.isClosed()) { - try { - allClosed = false; - logger.warn("Waiting for input to close. " - + input.getShortDescription() + ", " - + (iterations - i) + " more seconds"); - Thread.sleep(waitTimeMS); - } catch (Throwable t) { - // Ignore - } - } - } - if (allClosed) { - break; - } - } - if (!allClosed) { - logger.warn("Some inputs were not closed. Iterations=" + i); - for (Input input : inputList) { - if (!input.isClosed()) { - logger.warn("Input not closed. Will ignore it." - + input.getShortDescription()); - } - } - } else { - logger.info("All inputs are closed. Iterations=" + i); - } - - } - - public void checkInAll() { - for (Input input : inputList) { - input.checkIn(); - } - } - - public void cleanCheckPointFiles() { - - if (checkPointFolderFile == null) { - logger.info("Will not clean checkPoint files. checkPointFolderFile=" - + checkPointFolderFile); - return; - } - logger.info("Cleaning checkPoint files. checkPointFolderFile=" - + checkPointFolderFile.getAbsolutePath()); - try { - // Loop over the check point files and if filePath is not present, then move to closed - String searchPath = "*" + checkPointExtension; - FileFilter fileFilter = new WildcardFileFilter(searchPath); - File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter); - int totalCheckFilesDeleted = 0; - for (File checkPointFile : checkPointFiles) { - RandomAccessFile checkPointReader = null; - try { - checkPointReader = new RandomAccessFile(checkPointFile, "r"); - - int contentSize = checkPointReader.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointReader.read(b, 0, contentSize); - if (readSize != contentSize) { - logger.error("Couldn't read expected number of bytes from checkpoint file. expected=" - + contentSize - + ", read=" - + readSize - + ", checkPointFile=" + checkPointFile); - } else { - // Create JSON string - String jsonCheckPointStr = new String(b, 0, readSize); - Map jsonCheckPoint = LogFeederUtil - .toJSONObject(jsonCheckPointStr); - - String logFilePath = (String) jsonCheckPoint - .get("file_path"); - String logFileKey = (String) jsonCheckPoint - .get("file_key"); - if (logFilePath != null && logFileKey != null) { - boolean deleteCheckPointFile = false; - File logFile = new File(logFilePath); - if (logFile.exists()) { - Object fileKeyObj = InputFile - .getFileKey(logFile); - String fileBase64 = Base64 - .byteArrayToBase64(fileKeyObj - .toString().getBytes()); - if (!logFileKey.equals(fileBase64)) { - deleteCheckPointFile = true; - logger.info("CheckPoint clean: File key has changed. old=" - + logFileKey - + ", new=" - + fileBase64 - + ", filePath=" - + logFilePath - + ", checkPointFile=" - + checkPointFile.getAbsolutePath()); - } - } else { - logger.info("CheckPoint clean: Log file doesn't exist. filePath=" - + logFilePath - + ", checkPointFile=" - + checkPointFile.getAbsolutePath()); - deleteCheckPointFile = true; - } - if (deleteCheckPointFile) { - logger.info("Deleting CheckPoint file=" - + checkPointFile.getAbsolutePath() - + ", logFile=" + logFilePath); - checkPointFile.delete(); - totalCheckFilesDeleted++; - } - } - } - } catch (EOFException eof) { - logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. " - + checkPointFile); - } catch (Throwable t) { - logger.error("Error while checking checkPoint file. " - + checkPointFile, t); - } finally { - if (checkPointReader != null) { - try { - checkPointReader.close(); - } catch (Throwable t) { - logger.error("Error closing checkPoint file. " - + checkPointFile, t); - } - } - } - } - logger.info("Deleted " + totalCheckFilesDeleted - + " checkPoint file(s). checkPointFolderFile=" - + checkPointFolderFile.getAbsolutePath()); - - } catch (Throwable t) { - logger.error("Error while cleaning checkPointFiles", t); - } - } - - public void waitOnAllInputs() { - //wait on inputs - if (inputList != null) { - for (Input input : inputList) { - if (input != null) { - Thread inputThread = input.getThread(); - if (inputThread != null) { - try { - inputThread.join(); - } catch (InterruptedException e) { - // ignore - } - } - } - } - } - // wait on monitor - if (inputIsReadyMonitor != null) { - try { - this.close(); - inputIsReadyMonitor.join(); - } catch (InterruptedException e) { - // ignore - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java index c9d28bd..f560379 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java @@ -19,201 +19,57 @@ package org.apache.ambari.logfeeder.input; import java.io.BufferedReader; -import java.io.EOFException; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.ambari.logfeeder.util.S3Util; -import org.apache.commons.lang3.StringUtils; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; +import org.apache.commons.lang.ArrayUtils; import org.apache.solr.common.util.Base64; -public class InputS3File extends Input { - private static final Logger logger = Logger.getLogger(InputS3File.class); - - private String logPath = null; - private boolean isStartFromBegining = true; - - private boolean isReady = false; - private String[] s3LogPathFiles = null; - private Object fileKey = null; - private String base64FileKey = null; - - private boolean isRolledOver = false; - private boolean addWildCard = false; - - private long lastCheckPointTimeMS = 0; - private int checkPointIntervalMS = 5 * 1000; // 5 seconds - private RandomAccessFile checkPointWriter = null; - private Map jsonCheckPoint = null; - - private File checkPointFile = null; - - private InputMarker lastCheckPointInputMarker = null; - - private String checkPointExtension = ".cp"; - - - @Override - public void init() throws Exception { - logger.info("init() called"); - statMetric.metricsName = "input.files.read_lines"; - readBytesMetric.metricsName = "input.files.read_bytes"; - checkPointExtension = LogFeederUtil.getStringProperty( - "logfeeder.checkpoint.extension", checkPointExtension); - - // Let's close the file and set it to true after we start monitoring it - setClosed(true); - logPath = getStringValue("path"); - tail = getBooleanValue("tail", tail); - addWildCard = getBooleanValue("add_wild_card", addWildCard); - checkPointIntervalMS = getIntValue("checkpoint.interval.ms", - checkPointIntervalMS); - if (logPath == null || logPath.isEmpty()) { - logger.error("path is empty for file input. " + getShortDescription()); - return; - } - - String startPosition = getStringValue("start_position"); - if (StringUtils.isEmpty(startPosition) - || startPosition.equalsIgnoreCase("beginning") - || startPosition.equalsIgnoreCase("begining")) { - isStartFromBegining = true; - } - - if (!tail) { - // start position end doesn't apply if we are not tailing - isStartFromBegining = true; - } - - setFilePath(logPath); - boolean isFileReady = isReady(); - - logger.info("File to monitor " + logPath + ", tail=" + tail - + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady); - - super.init(); - } +public class InputS3File extends AbstractInputFile { @Override public boolean isReady() { if (!isReady) { // Let's try to check whether the file is available - s3LogPathFiles = getActualFiles(logPath); - if (s3LogPathFiles != null && s3LogPathFiles.length > 0) { - if (isTail() && s3LogPathFiles.length > 1) { - logger.warn("Found multiple files (" + s3LogPathFiles.length - + ") for the file filter " + filePath - + ". Will use only the first one. Using " + s3LogPathFiles[0]); + logFiles = getActualFiles(logPath); + if (!ArrayUtils.isEmpty(logFiles)) { + if (tail && logFiles.length > 1) { + LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath + + ". Will use only the first one. Using " + logFiles[0].getAbsolutePath()); } - logger.info("File filter " + filePath + " expanded to " - + s3LogPathFiles[0]); + LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath()); isReady = true; } else { - logger.debug(logPath + " file doesn't exist. Ignoring for now"); + LOG.debug(logPath + " file doesn't exist. Ignoring for now"); } } return isReady; } - private String[] getActualFiles(String searchPath) { + private File[] getActualFiles(String searchPath) { // TODO search file on s3 - return new String[] { searchPath }; - } - - @Override - synchronized public void checkIn(InputMarker inputMarker) { - super.checkIn(inputMarker); - if (checkPointWriter != null) { - try { - int lineNumber = LogFeederUtil.objectToInt( - jsonCheckPoint.get("line_number"), 0, "line_number"); - if (lineNumber > inputMarker.lineNumber) { - // Already wrote higher line number for this input - return; - } - // If interval is greater than last checkPoint time, then write - long currMS = System.currentTimeMillis(); - if (!isClosed() - && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) { - // Let's save this one so we can update the check point file - // on flush - lastCheckPointInputMarker = inputMarker; - return; - } - lastCheckPointTimeMS = currMS; - - jsonCheckPoint.put("line_number", "" - + new Integer(inputMarker.lineNumber)); - jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS)); - jsonCheckPoint.put("last_write_time_date", new Date()); - - String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint); - - // Let's rewind - checkPointWriter.seek(0); - checkPointWriter.writeInt(jsonStr.length()); - checkPointWriter.write(jsonStr.getBytes()); - - if (isClosed()) { - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_FINAL_CHECKIN"; - LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Wrote final checkPoint, input=" + getShortDescription() - + ", checkPointFile=" + checkPointFile.getAbsolutePath() - + ", checkPoint=" + jsonStr, null, logger, Level.INFO); - } - } catch (Throwable t) { - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_CHECKIN_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Caught exception checkIn. , input=" + getShortDescription(), t, - logger, Level.ERROR); - } - } - - } - - @Override - public void checkIn() { - super.checkIn(); - if (lastCheckPointInputMarker != null) { - checkIn(lastCheckPointInputMarker); - } - } - - @Override - public void rollOver() { - logger.info("Marking this input file for rollover. " - + getShortDescription()); - isRolledOver = true; + return new File[] { new File(searchPath) }; } @Override void start() throws Exception { - if (s3LogPathFiles == null || s3LogPathFiles.length == 0) { + if (ArrayUtils.isEmpty(logFiles)) { return; } - if (isTail()) { - processFile(s3LogPathFiles[0]); + if (tail) { + processFile(logFiles[0]); } else { - for (String s3FilePath : s3LogPathFiles) { + for (File s3FilePath : logFiles) { try { processFile(s3FilePath); if (isClosed() || isDrain()) { - logger.info("isClosed or isDrain. Now breaking loop."); + LOG.info("isClosed or isDrain. Now breaking loop."); break; } } catch (Throwable t) { - logger.error("Error processing file=" + s3FilePath, t); + LOG.error("Error processing file=" + s3FilePath, t); } } } @@ -221,244 +77,18 @@ public class InputS3File extends Input { } @Override - public void close() { - super.close(); - logger.info("close() calling checkPoint checkIn(). " - + getShortDescription()); - checkIn(); - } - - private void processFile(String logPathFile) throws FileNotFoundException, - IOException { - logger.info("Monitoring logPath=" + logPath + ", logPathFile=" - + logPathFile); - BufferedReader br = null; - checkPointFile = null; - checkPointWriter = null; - jsonCheckPoint = null; - int resumeFromLineNumber = 0; - - int lineCount = 0; - try { - setFilePath(logPathFile); - String s3AccessKey = getStringValue("s3_access_key"); - String s3SecretKey = getStringValue("s3_secret_key"); - br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey); - if(br==null){ - //log err - return; - } - - // Whether to send to output from the beginning. - boolean resume = isStartFromBegining; - - // Seems FileWatch is not reliable, so let's only use file key comparison - fileKey = getFileKey(logPathFile); - base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes()); - logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " - + getShortDescription()); - - if (isTail()) { - try { - // Let's see if there is a checkpoint for this file - logger.info("Checking existing checkpoint file. " - + getShortDescription()); - - String fileBase64 = Base64.byteArrayToBase64(fileKey.toString() - .getBytes()); - String checkPointFileName = fileBase64 + checkPointExtension; - File checkPointFolder = inputMgr.getCheckPointFolderFile(); - checkPointFile = new File(checkPointFolder, checkPointFileName); - checkPointWriter = new RandomAccessFile(checkPointFile, "rw"); - - try { - int contentSize = checkPointWriter.readInt(); - byte b[] = new byte[contentSize]; - int readSize = checkPointWriter.read(b, 0, contentSize); - if (readSize != contentSize) { - logger - .error("Couldn't read expected number of bytes from checkpoint file. expected=" - + contentSize - + ", read=" - + readSize - + ", checkPointFile=" - + checkPointFile - + ", input=" - + getShortDescription()); - } else { - String jsonCheckPointStr = new String(b, 0, readSize); - jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr); - - resumeFromLineNumber = LogFeederUtil.objectToInt( - jsonCheckPoint.get("line_number"), 0, "line_number"); - - if (resumeFromLineNumber > 0) { - // Let's read from last line read - resume = false; - } - logger.info("CheckPoint. checkPointFile=" + checkPointFile - + ", json=" + jsonCheckPointStr + ", resumeFromLineNumber=" - + resumeFromLineNumber + ", resume=" + resume); - } - } catch (EOFException eofEx) { - logger.info("EOFException. Will reset checkpoint file " - + checkPointFile.getAbsolutePath() + " for " - + getShortDescription()); - } - if (jsonCheckPoint == null) { - // This seems to be first time, so creating the initial - // checkPoint object - jsonCheckPoint = new HashMap(); - jsonCheckPoint.put("file_path", filePath); - jsonCheckPoint.put("file_key", fileBase64); - } - - } catch (Throwable t) { - logger.error( - "Error while configuring checkpoint file. Will reset file. checkPointFile=" - + checkPointFile, t); - } - } - - setClosed(false); - int sleepStep = 2; - int sleepIteration = 0; - while (true) { - try { - if (isDrain()) { - break; - } - - String line = br.readLine(); - if (line == null) { - if (!resume) { - resume = true; - } - sleepIteration++; - try { - // Since FileWatch service is not reliable, we will check - // file inode every n seconds after no write - if (sleepIteration > 4) { - Object newFileKey = getFileKey(logPathFile); - if (newFileKey != null) { - if (fileKey == null || !newFileKey.equals(fileKey)) { - logger - .info("File key is different. Calling rollover. oldKey=" - + fileKey - + ", newKey=" - + newFileKey - + ". " - + getShortDescription()); - // File has rotated. - rollOver(); - } - } - } - // Flush on the second iteration - if (!tail && sleepIteration >= 2) { - logger.info("End of file. Done with filePath=" + logPathFile - + ", lineCount=" + lineCount); - flush(); - break; - } else if (sleepIteration == 2) { - flush(); - } else if (sleepIteration >= 2) { - if (isRolledOver) { - isRolledOver = false; - // Close existing file - try { - logger - .info("File is rolled over. Closing current open file." - + getShortDescription() + ", lineCount=" - + lineCount); - br.close(); - } catch (Exception ex) { - logger.error("Error closing file" + getShortDescription()); - break; - } - try { - // Open new file - logger.info("Opening new rolled over file." - + getShortDescription()); - br = S3Util.INSTANCE.getReader(logPathFile,s3AccessKey,s3SecretKey); - lineCount = 0; - fileKey = getFileKey(logPathFile); - base64FileKey = Base64.byteArrayToBase64(fileKey.toString() - .getBytes()); - logger.info("fileKey=" + fileKey + ", base64=" - + base64FileKey + ", " + getShortDescription()); - } catch (Exception ex) { - logger.error("Error opening rolled over file. " - + getShortDescription()); - // Let's add this to monitoring and exit this thread - logger.info("Added input to not ready list." - + getShortDescription()); - isReady = false; - inputMgr.addToNotReady(this); - break; - } - logger.info("File is successfully rolled over. " - + getShortDescription()); - continue; - } - } - Thread.sleep(sleepStep * 1000); - sleepStep = (sleepStep * 2); - sleepStep = sleepStep > 10 ? 10 : sleepStep; - } catch (InterruptedException e) { - logger.info("Thread interrupted." + getShortDescription()); - } - } else { - lineCount++; - sleepStep = 1; - sleepIteration = 0; - - if (!resume && lineCount > resumeFromLineNumber) { - logger.info("Resuming to read from last line. lineCount=" - + lineCount + ", input=" + getShortDescription()); - resume = true; - } - if (resume) { - InputMarker marker = new InputMarker(); - marker.base64FileKey = base64FileKey; - marker.input = this; - marker.lineNumber = lineCount; - outputLine(line, marker); - } - } - } catch (Throwable t) { - final String LOG_MESSAGE_KEY = this.getClass().getSimpleName() - + "_READ_LOOP_EXCEPTION"; - LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY, - "Caught exception in read loop. lineNumber=" + lineCount - + ", input=" + getShortDescription(), t, logger, Level.ERROR); - - } - } - } finally { - if (br != null) { - logger.info("Closing reader." + getShortDescription() + ", lineCount=" - + lineCount); - try { - br.close(); - } catch (Throwable t) { - // ignore - } - } - } - } - - static public Object getFileKey(String s3FilePath) { - return s3FilePath.toString(); + protected BufferedReader openLogFile(File logPathFile) throws IOException { + String s3AccessKey = getStringValue("s3_access_key"); + String s3SecretKey = getStringValue("s3_secret_key"); + BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey); + fileKey = getFileKey(logPathFile); + base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes()); + LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription()); + return br; } @Override - public String getShortDescription() { - return "input:source=" - + getStringValue("source") - + ", path=" - + (s3LogPathFiles != null && s3LogPathFiles.length > 0 ? s3LogPathFiles[0] - : getStringValue("path")); + protected Object getFileKey(File logFile) { + return logFile.getPath(); } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java index 5ba56a5..743be69 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java @@ -18,7 +18,7 @@ */ package org.apache.ambari.logfeeder.input; -import java.net.Inet4Address; +import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -66,7 +66,7 @@ public class InputSimulate extends Input { Filter filter = new FilterJSON(); filter.setInput(this); - setFirstFilter(filter); + addFilter(filter); } private List getSimulatedLogTypes() { @@ -88,23 +88,18 @@ public class InputSimulate extends Input { return LOG_TEXT_PATTERN.replaceAll("", logMessagePattern); } - - @Override - public String getNameForThread() { - return "Simulated input"; - } @Override - public String getShortDescription() { - return "Simulated input"; + public boolean isReady() { + return true; } - + @Override void start() throws Exception { if (types.isEmpty()) return; - getFirstFilter().setOutputMgr(outputMgr); + getFirstFilter().setOutputManager(outputManager); while (true) { String type = imitateRandomLogFile(); @@ -129,10 +124,7 @@ public class InputSimulate extends Input { } private InputMarker getInputMarker(String type) throws Exception { - InputMarker marker = new InputMarker(); - marker.input = this; - marker.lineNumber = getLineNumber(type); - marker.base64FileKey = getBase64FileKey(); + InputMarker marker = new InputMarker(this, getBase64FileKey(), getLineNumber(type)); return marker; } @@ -147,7 +139,7 @@ public class InputSimulate extends Input { } private String getBase64FileKey() throws Exception { - String fileKey = Inet4Address.getLocalHost().getHostAddress() + "|" + filePath; + String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + filePath; return Base64.byteArrayToBase64(fileKey.getBytes()); } @@ -155,4 +147,20 @@ public class InputSimulate extends Input { Date d = new Date(); return String.format(logText, d.getTime(), level, marker.lineNumber); } + + @Override + public void checkIn(InputMarker inputMarker) {} + + @Override + public void lastCheckIn() {} + + @Override + public String getNameForThread() { + return "Simulated input"; + } + + @Override + public String getShortDescription() { + return "Simulated input"; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/51fdb2de/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java index a2a9db2..9ccc4f2 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java @@ -18,7 +18,6 @@ */ package org.apache.ambari.logfeeder.input.reader; -import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,15 +29,11 @@ import org.apache.log4j.Logger; class GZIPReader extends InputStreamReader { - private static Logger logger = Logger.getLogger(GZIPReader.class); + private static final Logger LOG = Logger.getLogger(GZIPReader.class); GZIPReader(String fileName) throws FileNotFoundException { super(getStream(fileName)); - logger.info("Created GZIPReader for file : " + fileName); - } - - GZIPReader(File file) throws FileNotFoundException { - super(getStream(file.getName())); + LOG.info("Created GZIPReader for file : " + fileName); } private static InputStream getStream(String fileName) { @@ -48,7 +43,7 @@ class GZIPReader extends InputStreamReader { fileStream = new FileInputStream(fileName); gzipStream = new GZIPInputStream(fileStream); } catch (Exception e) { - logger.error(e, e.getCause()); + LOG.error(e, e.getCause()); } return gzipStream; } @@ -58,21 +53,13 @@ class GZIPReader extends InputStreamReader { */ static boolean isValidFile(String fileName) { // TODO make it generic and put in factory itself - InputStream is = null; - try { - is = new FileInputStream(fileName); + + try (InputStream is = new FileInputStream(fileName)) { byte[] signature = new byte[2]; int nread = is.read(signature); // read the gzip signature return nread == 2 && signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b; } catch (IOException e) { return false; - } finally { - if (is != null) { - try { - is.close(); - } catch (IOException e) { - } - } } } }