Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B7E4200B61 for ; Tue, 9 Aug 2016 15:18:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 09FF1160AA5; Tue, 9 Aug 2016 13:18:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 57E38160AA4 for ; Tue, 9 Aug 2016 15:18:52 +0200 (CEST) Received: (qmail 50374 invoked by uid 500); 9 Aug 2016 13:18:51 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 50359 invoked by uid 99); 9 Aug 2016 13:18:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Aug 2016 13:18:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 25B44DFE1A; Tue, 9 Aug 2016 13:18:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: oleewere@apache.org To: commits@ambari.apache.org Message-Id: <0dc8fa6a66e6450fa2c084b4ba63a4d5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-17785. Provide support for S3 as a first class destination for log events (Hemanth Yamijala via oleewere) Date: Tue, 9 Aug 2016 13:18:51 +0000 (UTC) archived-at: Tue, 09 Aug 2016 13:18:54 -0000 Repository: ambari Updated Branches: refs/heads/trunk f8511d4e5 -> 6dcf63819 AMBARI-17785. Provide support for S3 as a first class destination for log events (Hemanth Yamijala via oleewere) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6dcf6381 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6dcf6381 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6dcf6381 Branch: refs/heads/trunk Commit: 6dcf638192e8abff967c405cc9ef01b4dfc3b764 Parents: f8511d4 Author: oleewere Authored: Tue Aug 9 15:08:13 2016 +0200 Committer: oleewere Committed: Tue Aug 9 15:08:13 2016 +0200 ---------------------------------------------------------------------- .../ambari/logfeeder/input/InputMarker.java | 1 - .../ambari/logfeeder/output/OutputS3File.java | 227 ++++++++++++------- .../logfeeder/output/S3LogPathResolver.java | 54 +++++ .../logfeeder/output/S3OutputConfiguration.java | 114 ++++++++++ .../ambari/logfeeder/output/S3Uploader.java | 163 +++++++++++++ .../logfeeder/output/spool/LogSpooler.java | 91 +++++++- .../org/apache/ambari/logfeeder/s3/S3Util.java | 8 +- .../logfeeder/output/OutputS3FileTest.java | 198 ++++++++++++++++ .../logfeeder/output/S3LogPathResolverTest.java | 51 +++++ .../ambari/logfeeder/output/S3UploaderTest.java | 164 ++++++++++++++ .../logfeeder/output/spool/LogSpoolerTest.java | 182 ++++++++++++--- 11 files changed, 1123 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java index 8def4b9..48a7f1d 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java @@ -32,5 +32,4 @@ public class InputMarker { return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]"; } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java index f42195c..cbc1045 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java @@ -18,83 +18,95 @@ */ package org.apache.ambari.logfeeder.output; -import java.io.File; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.ambari.logfeeder.LogFeeder; import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.output.spool.LogSpooler; +import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; +import org.apache.ambari.logfeeder.output.spool.RolloverCondition; +import org.apache.ambari.logfeeder.output.spool.RolloverHandler; import org.apache.ambari.logfeeder.s3.S3Util; -import org.apache.ambari.logfeeder.util.CompressionUtil; -import org.apache.ambari.logfeeder.util.PlaceholderUtil; +import org.apache.log4j.Logger; + +import java.io.File; +import java.util.*; +import java.util.Map.Entry; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; /** - * Write log file into s3 bucket + * Write log file into s3 bucket. + * + * This class supports two modes of upload: + *
    + *
  • A one time upload of files matching a pattern
  • + *
  • A batch mode, asynchronous, periodic upload of files
  • + *
*/ -public class OutputS3File extends Output { +public class OutputS3File extends Output implements RolloverCondition, RolloverHandler { + + public static final String INPUT_ATTRIBUTE_TYPE = "type"; + public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json"; + static private Logger logger = Logger.getLogger(OutputS3File.class); + + private LogSpooler logSpooler; + private S3OutputConfiguration s3OutputConfiguration; + private S3Uploader s3Uploader; + @Override + public void init() throws Exception { + super.init(); + s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this); + } private static boolean uploadedGlobalConfig = false; + /** + * Copy local log files and corresponding config to S3 bucket one time. + * @param inputFile The file to be copied + * @param inputMarker Contains information about the configuration to be uploaded. + */ @Override public void copyFile(File inputFile, InputMarker inputMarker) { - String bucketName = getStringValue("s3_bucket"); - String s3LogDir = getStringValue("s3_log_dir"); - HashMap contextParam = buildContextParam(); - s3LogDir = PlaceholderUtil.replaceVariables(s3LogDir, contextParam); - String s3AccessKey = getStringValue("s3_access_key"); - String s3SecretKey = getStringValue("s3_secret_key"); - String compressionAlgo = getStringValue("compression_algo"); - String fileName = inputFile.getName(); - // create tmp compressed File - String tmpDir = LogFeederUtil.getLogfeederTempDir(); - File outputFile = new File(tmpDir + fileName + "_" - + new Date().getTime() + "." + compressionAlgo); - outputFile = CompressionUtil.compressFile(inputFile, outputFile, - compressionAlgo); - String type = inputMarker.input.getStringValue("type"); - String s3Path = s3LogDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + type - + S3Util.INSTANCE.S3_PATH_SEPARATOR + fileName + "." - + compressionAlgo; - S3Util.INSTANCE.uploadFileTos3(bucketName, s3Path, outputFile, s3AccessKey, - s3SecretKey); - // delete local compressed file - outputFile.deleteOnExit(); - ArrayList> filters = new ArrayList>(); + String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE); + S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, + S3Util.INSTANCE, false, type); + String resolvedPath = s3Uploader.uploadFile(inputFile, + inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE)); + + uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath); + } + + private void uploadConfig(InputMarker inputMarker, String type, + S3OutputConfiguration s3OutputConfiguration, String resolvedPath) { + + ArrayList> filters = new ArrayList<>(); addFilters(filters, inputMarker.input.getFirstFilter()); - Map inputConfig = new HashMap(); + Map inputConfig = new HashMap<>(); inputConfig.putAll(inputMarker.input.getConfigs()); - String s3CompletePath = S3Util.INSTANCE.S3_PATH_START_WITH + bucketName - + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Path; + String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() + + S3Util.S3_PATH_SEPARATOR + resolvedPath; inputConfig.put("path", s3CompletePath); - ArrayList> inputConfigList = new ArrayList>(); + ArrayList> inputConfigList = new ArrayList<>(); inputConfigList.add(inputConfig); // set source s3_file // remove global config from filter config removeGlobalConfig(inputConfigList); removeGlobalConfig(filters); // write config into s3 file - String s3Key = getComponentConfigFileName(type); - Map config = new HashMap(); + Map config = new HashMap<>(); config.put("filter", filters); config.put("input", inputConfigList); - writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, contextParam, - s3Key); + writeConfigToS3(config, getComponentConfigFileName(type), s3OutputConfiguration); // write global config - writeGlobalConfig(); + writeGlobalConfig(s3OutputConfiguration); } - public void addFilters(ArrayList> filters, Filter filter) { + private void addFilters(ArrayList> filters, Filter filter) { if (filter != null) { Map filterConfig = new HashMap(); filterConfig.putAll(filter.getConfigs()); @@ -105,38 +117,28 @@ public class OutputS3File extends Output { } } - public void writeConfigToS3(Map config, String bucketName, - String accessKey, String secretKey, HashMap contextParam, - String s3Key) { - String s3ConfigDir = getStringValue("s3_config_dir"); - s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam); + private void writeConfigToS3(Map configToWrite, String s3KeySuffix, + S3OutputConfiguration s3OutputConfiguration) { Gson gson = new GsonBuilder().setPrettyPrinting().create(); - String configJson = gson.toJson(config); + String configJson = gson.toJson(configToWrite); - s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key; - S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey, - secretKey); - } + String s3ResolvedKey = new S3LogPathResolver(). + getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster()); - public String getComponentConfigFileName(String componentName) { - String fileName = "input.config-" + componentName + ".json"; - return fileName; + S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), + s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(), + s3OutputConfiguration.getS3SecretKey()); } - public HashMap buildContextParam() { - HashMap contextParam = new HashMap(); - contextParam.put("host", LogFeederUtil.hostName); - contextParam.put("ip", LogFeederUtil.ipAddress); - String cluster = getNVList("add_fields").get("cluster"); - contextParam.put("cluster", cluster); - return contextParam; + private String getComponentConfigFileName(String componentName) { + return "input.config-" + componentName + ".json"; } - + private Map getGlobalConfig() { Map globalConfig = LogFeeder.globalMap; if (globalConfig == null) { - globalConfig = new HashMap(); + globalConfig = new HashMap<>(); } return globalConfig; } @@ -163,7 +165,7 @@ public class OutputS3File extends Output { * write global config in s3 file Invoke only once */ @SuppressWarnings("unchecked") - private synchronized void writeGlobalConfig() { + private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) { if (!uploadedGlobalConfig) { Map globalConfig = LogFeederUtil.cloneObject(getGlobalConfig()); //updating global config before write to s3 @@ -174,7 +176,7 @@ public class OutputS3File extends Output { Map addFields = (Map) globalConfig .get("add_fields"); if (addFields == null) { - addFields = new HashMap(); + addFields = new HashMap<>(); } addFields.put("ip", LogFeederUtil.ipAddress); addFields.put("host", LogFeederUtil.hostName); @@ -189,20 +191,85 @@ public class OutputS3File extends Output { globalConfig.put("add_fields", addFields); Map config = new HashMap(); config.put("global", globalConfig); - String s3AccessKey = getStringValue("s3_access_key"); - String s3SecretKey = getStringValue("s3_secret_key"); - String bucketName = getStringValue("s3_bucket"); - String s3Key = "global.config.json"; - HashMap contextParam = buildContextParam(); - writeConfigToS3(config, bucketName, s3AccessKey, s3SecretKey, - contextParam, s3Key); + writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration); uploadedGlobalConfig = true; } } + /** + * Write a log line to local file, to upload to S3 bucket asynchronously. + * + * This method uses a {@link LogSpooler} to spool the log lines to a local file. + + * @param block The log event to upload + * @param inputMarker Contains information about the log file feeding the lines. + * @throws Exception + */ @Override public void write(String block, InputMarker inputMarker) throws Exception { - throw new UnsupportedOperationException( - "write method is not yet supported for output=s3_file"); + if (logSpooler == null) { + logSpooler = createSpooler(inputMarker.input.getFilePath()); + s3Uploader = createUploader(inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE)); + } + logSpooler.add(block); + } + + @VisibleForTesting + protected S3Uploader createUploader(String logType) { + S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType); + uploader.startUploaderThread(); + return uploader; + } + + @VisibleForTesting + protected LogSpooler createSpooler(String filePath) { + String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service"; + logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", + spoolDirectory, filePath)); + return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this, + s3OutputConfiguration.getRolloverTimeThresholdSecs()); + } + + /** + * Check whether the locally spooled file should be rolled over, based on file size. + * + * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked + * for rollover. + * @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()}, + * false otherwise + */ + @Override + public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) { + File spoolFile = currentSpoolerContext.getActiveSpoolFile(); + long currentSize = spoolFile.length(); + boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes()); + if (result) { + logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize, + s3OutputConfiguration.getRolloverSizeThresholdBytes())); + } + return result; + } + + /** + * Stops dependent objects that consume resources. + */ + @Override + public void close() { + if (s3Uploader != null) { + s3Uploader.stopUploaderThread(); + } + if (logSpooler != null) { + logSpooler.close(); + } + } + + /** + * Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously. + * + * @param rolloverFile The file that has been rolled over. + */ + @Override + public void handleRollover(File rolloverFile) { + s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java new file mode 100644 index 0000000..1bbf33e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.PlaceholderUtil; + +import java.util.HashMap; + +/** + * A utility class that resolves variables like hostname, IP address and cluster name in S3 paths. + */ +public class S3LogPathResolver { + + /** + * Construct a full S3 path by resolving variables in the path name including hostname, IP address + * and cluster name + * @param baseKeyPrefix The prefix which can contain the variables. + * @param keySuffix The suffix appended to the prefix after variable expansion + * @param cluster The name of the cluster + * @return full S3 path. + */ + public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) { + HashMap contextParam = buildContextParam(cluster); + String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam); + return resolvedKeyPrefix + S3Util.S3_PATH_SEPARATOR + keySuffix; + } + + private HashMap buildContextParam(String cluster) { + HashMap contextParam = new HashMap<>(); + contextParam.put("host", LogFeederUtil.hostName); + contextParam.put("ip", LogFeederUtil.ipAddress); + contextParam.put("cluster", cluster); + return contextParam; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java new file mode 100644 index 0000000..fb597d3 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import org.apache.ambari.logfeeder.ConfigBlock; + +import java.util.HashMap; +import java.util.Map; + +/** + * Holds all configuration relevant for S3 upload. + */ +public class S3OutputConfiguration { + + public static final String SPOOL_DIR_KEY = "spool_dir"; + public static final String ROLLOVER_SIZE_THRESHOLD_BYTES_KEY = "rollover_size_threshold_bytes"; + public static final Long DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES = 10 * 1024 * 1024L; + public static final String ROLLOVER_TIME_THRESHOLD_SECS_KEY = "rollover_time_threshold_secs"; + public static final Long DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS = 3600L; + public static final String S3_BUCKET_NAME_KEY = "s3_bucket"; + public static final String S3_LOG_DIR_KEY = "s3_log_dir"; + public static final String S3_ACCESS_KEY = "s3_access_key"; + public static final String S3_SECRET_KEY = "s3_secret_key"; + public static final String COMPRESSION_ALGO_KEY = "compression_algo"; + public static final String ADDITIONAL_FIELDS_KEY = "add_fields"; + public static final String CLUSTER_KEY = "cluster"; + + private Map configs; + + S3OutputConfiguration(Map configs) { + this.configs = configs; + } + + public String getS3BucketName() { + return (String) configs.get(S3_BUCKET_NAME_KEY); + } + + public String getS3Path() { + return (String) configs.get(S3_LOG_DIR_KEY); + } + + public String getS3AccessKey() { + return (String) configs.get(S3_ACCESS_KEY); + } + + public String getS3SecretKey() { + return (String) configs.get(S3_SECRET_KEY); + } + + public String getCompressionAlgo() { + return (String) configs.get(COMPRESSION_ALGO_KEY); + } + + public Long getRolloverSizeThresholdBytes() { + return (Long) configs.get(ROLLOVER_SIZE_THRESHOLD_BYTES_KEY); + } + + public Long getRolloverTimeThresholdSecs() { + return (Long) configs.get(ROLLOVER_TIME_THRESHOLD_SECS_KEY); + } + + @SuppressWarnings("unchecked") + public String getCluster() { + return ((Map) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY); + } + + public static S3OutputConfiguration fromConfigBlock(ConfigBlock configBlock) { + Map configs = new HashMap<>(); + String[] stringValuedKeysToCopy = new String[] { + SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY, + S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY + }; + + for (String key : stringValuedKeysToCopy) { + String value = configBlock.getStringValue(key); + if (value != null) { + configs.put(key, value); + } + } + + String[] longValuedKeysToCopy = new String[] { + ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, ROLLOVER_TIME_THRESHOLD_SECS_KEY + }; + + Long[] defaultValuesForLongValuedKeys = new Long[] { + DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES, DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS + }; + + for (int i = 0; i < longValuedKeysToCopy.length; i++) { + configs.put(longValuedKeysToCopy[i], + configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i])); + } + + configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY)); + + return new S3OutputConfiguration(configs); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java new file mode 100644 index 0000000..dec685f --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.CompressionUtil; +import org.apache.log4j.Logger; + +import java.io.File; +import java.util.Date; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A class that handles the uploading of files to S3. + * + * This class can be used to upload a file one time, or start a daemon thread that can + * be used to upload files added to a queue one after the other. When used to upload + * files via a queue, one instance of this class is created for each file handled in + * {@link org.apache.ambari.logfeeder.input.InputFile}. + */ +public class S3Uploader implements Runnable { + public static final String POISON_PILL = "POISON-PILL"; + private static Logger logger = Logger.getLogger(S3Uploader.class); + + private final S3OutputConfiguration s3OutputConfiguration; + private final S3Util s3UtilInstance; + private final boolean deleteOnEnd; + private String logType; + private final BlockingQueue fileContextsToUpload; + private AtomicBoolean stopRunningThread = new AtomicBoolean(false); + + public S3Uploader(S3OutputConfiguration s3OutputConfiguration, S3Util s3UtilInstance, boolean deleteOnEnd, + String logType) { + this.s3OutputConfiguration = s3OutputConfiguration; + this.s3UtilInstance = s3UtilInstance; + this.deleteOnEnd = deleteOnEnd; + this.logType = logType; + this.fileContextsToUpload = new LinkedBlockingQueue<>(); + } + + /** + * Starts a thread that can be used to upload files from a queue. + * + * Add files to be uploaded using the method {@link #addFileForUpload(String)}. + * If this thread is started, it must be stopped using the method {@link #stopUploaderThread()}. + */ + void startUploaderThread() { + Thread s3UploaderThread = new Thread(this, "s3-uploader-thread-"+logType); + s3UploaderThread.setDaemon(true); + s3UploaderThread.start(); + } + + /** + * Stops the thread used to upload files from a queue. + * + * This method must be called to cleanly free up resources, typically on shutdown of the process. + * Note that this method does not drain any remaining files, and instead stops the thread + * as soon as any file being currently uploaded is complete. + */ + void stopUploaderThread() { + stopRunningThread.set(true); + boolean offerStatus = fileContextsToUpload.offer(POISON_PILL); + if (!offerStatus) { + logger.warn("Could not add poison pill to interrupt uploader thread."); + } + } + + /** + * Add a file to a queue to upload asynchronously. + * @param fileToUpload Full path to the local file which must be uploaded. + */ + void addFileForUpload(String fileToUpload) { + boolean offerStatus = fileContextsToUpload.offer(fileToUpload); + if (!offerStatus) { + logger.error("Could not add file " + fileToUpload + " for upload."); + } + } + + @Override + public void run() { + while (!stopRunningThread.get()) { + try { + String fileNameToUpload = fileContextsToUpload.take(); + if (POISON_PILL.equals(fileNameToUpload)) { + logger.warn("Found poison pill while waiting for files to upload, exiting"); + return; + } + uploadFile(new File(fileNameToUpload), logType); + } catch (InterruptedException e) { + logger.error("Interrupted while waiting for elements from fileContextsToUpload", e); + return; + } + } + } + + /** + * Upload the given file to S3. + * + * The file which should be available locally, is first compressed using the compression + * method specified by {@link S3OutputConfiguration#getCompressionAlgo()}. This compressed + * file is what is uploaded to S3. + * @param fileToUpload the file to upload + * @param logType the name of the log which is used in the S3 path constructed. + * @return + */ + String uploadFile(File fileToUpload, String logType) { + String bucketName = s3OutputConfiguration.getS3BucketName(); + String s3AccessKey = s3OutputConfiguration.getS3AccessKey(); + String s3SecretKey = s3OutputConfiguration.getS3SecretKey(); + String compressionAlgo = s3OutputConfiguration.getCompressionAlgo(); + + String keySuffix = fileToUpload.getName() + "." + compressionAlgo; + String s3Path = new S3LogPathResolver(). + getResolvedPath(s3OutputConfiguration.getS3Path()+S3Util.S3_PATH_SEPARATOR+logType, + keySuffix, s3OutputConfiguration.getCluster()); + logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", + s3OutputConfiguration.getS3Path(), keySuffix, s3Path)); + File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo); + + logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path); + s3UtilInstance.uploadFileTos3(bucketName, s3Path, sourceFile, s3AccessKey, + s3SecretKey); + + // delete local compressed file + sourceFile.delete(); + if (deleteOnEnd) { + logger.info("Deleting input file as required"); + if (!fileToUpload.delete()) { + logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3"); + } + } + return s3Path; + } + + @VisibleForTesting + protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { + File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + + new Date().getTime() + "." + compressionAlgo); + outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, + compressionAlgo); + return outputFile; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java index 306326a..fb263ba 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java @@ -25,6 +25,9 @@ import org.apache.log4j.Logger; import java.io.*; import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; /** * A class that manages local storage of log events before they are uploaded to the output destinations. @@ -36,6 +39,7 @@ import java.util.Date; * {@link RolloverHandler} to trigger the handling of the rolled over file. */ public class LogSpooler { + public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0; static private Logger logger = Logger.getLogger(LogSpooler.class); static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss"; @@ -46,6 +50,8 @@ public class LogSpooler { private PrintWriter currentSpoolBufferedWriter; private File currentSpoolFile; private LogSpoolerContext currentSpoolerContext; + private Timer rolloverTimer; + private AtomicBoolean rolloverInProgress = new AtomicBoolean(false); /** * Create an instance of the LogSpooler. @@ -59,11 +65,34 @@ public class LogSpooler { */ public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition, RolloverHandler rolloverHandler) { + this(spoolDirectory, sourceFileNamePrefix, rolloverCondition, rolloverHandler, + TIME_BASED_ROLLOVER_DISABLED_THRESHOLD); + } + + /** + * Create an instance of the LogSpooler. + * @param spoolDirectory The directory under which spooler files are created. + * Should be unique per instance of {@link Output} + * @param sourceFileNamePrefix The prefix with which the locally spooled files are created. + * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to + * determine when to rollover. + * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when + * there should be a rollover. + * @param rolloverTimeThresholdSecs Setting a non-zero value enables time based rollover of + * spool files. Sending a 0 value disables this functionality. + */ + public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition, + RolloverHandler rolloverHandler, long rolloverTimeThresholdSecs) { this.spoolDirectory = spoolDirectory; this.sourceFileNamePrefix = sourceFileNamePrefix; this.rolloverCondition = rolloverCondition; this.rolloverHandler = rolloverHandler; - initializeSpoolFile(); + if (rolloverTimeThresholdSecs != TIME_BASED_ROLLOVER_DISABLED_THRESHOLD) { + rolloverTimer = new Timer("log-spooler-timer-" + sourceFileNamePrefix, true); + rolloverTimer.scheduleAtFixedRate(new LogSpoolerRolloverTimerTask(), + rolloverTimeThresholdSecs*1000, rolloverTimeThresholdSecs*1000); + } + initializeSpoolState(); } private void initializeSpoolDirectory() { @@ -77,9 +106,9 @@ public class LogSpooler { } } - private void initializeSpoolFile() { + private void initializeSpoolState() { initializeSpoolDirectory(); - currentSpoolFile = new File(spoolDirectory, getCurrentFileName()); + currentSpoolFile = initializeSpoolFile(); try { currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile); } catch (IOException e) { @@ -87,7 +116,12 @@ public class LogSpooler { + ", error message: " + e.getLocalizedMessage(), e); } currentSpoolerContext = new LogSpoolerContext(currentSpoolFile); - logger.info("Initialized spool file at path: " + currentSpoolFile.getAbsolutePath()); + logger.info("Initialized spool file at path: " + currentSpoolFile); + } + + @VisibleForTesting + protected File initializeSpoolFile() { + return new File(spoolDirectory, getCurrentFileName()); } @VisibleForTesting @@ -103,11 +137,12 @@ public class LogSpooler { * it is ready to rollover the file. * @param logEvent The log event to spool. */ - public void add(String logEvent) { + public synchronized void add(String logEvent) { currentSpoolBufferedWriter.println(logEvent); currentSpoolerContext.logEventSpooled(); if (rolloverCondition.shouldRollover(currentSpoolerContext)) { - rollover(); + logger.info("Trying to rollover based on rollover condition"); + tryRollover(); } } @@ -121,17 +156,49 @@ public class LogSpooler { public void rollover() { logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile); currentSpoolBufferedWriter.flush(); - currentSpoolBufferedWriter.close(); - rolloverHandler.handleRollover(currentSpoolFile); - logger.info("Invoked rollover handler with file: " + currentSpoolFile); - initializeSpoolFile(); + if (currentSpoolFile.length()==0) { + logger.info("No data in file " + currentSpoolFile + ", not doing rollover"); + } else { + currentSpoolBufferedWriter.close(); + rolloverHandler.handleRollover(currentSpoolFile); + logger.info("Invoked rollover handler with file: " + currentSpoolFile); + initializeSpoolState(); + } + boolean status = rolloverInProgress.compareAndSet(true, false); + if (!status) { + logger.error("Should have reset rollover flag!!"); + } } - @VisibleForTesting - protected String getCurrentFileName() { + private synchronized void tryRollover() { + if (rolloverInProgress.compareAndSet(false, true)) { + rollover(); + } else { + logger.warn("Ignoring rollover call as rollover already in progress for file " + + currentSpoolFile); + } + } + + private String getCurrentFileName() { Date currentDate = new Date(); String dateStr = DateUtil.dateToString(currentDate, fileDateFormat); return sourceFileNamePrefix + dateStr; } + /** + * Cancel's any time based rollover task, if started. + */ + public void close() { + if (rolloverTimer != null) { + rolloverTimer.cancel(); + } + } + + private class LogSpoolerRolloverTimerTask extends TimerTask { + @Override + public void run() { + logger.info("Trying rollover based on time"); + tryRollover(); + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java index ced2b5c..db187be 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java @@ -44,13 +44,13 @@ import com.amazonaws.services.s3.transfer.Upload; /** * Utility to connect to s3 */ -public enum S3Util { - INSTANCE; +public class S3Util { + public static final S3Util INSTANCE = new S3Util(); private static final Logger LOG = Logger.getLogger(S3Util.class); - public final String S3_PATH_START_WITH = "s3://"; - public final String S3_PATH_SEPARATOR = "/"; + public static final String S3_PATH_START_WITH = "s3://"; + public static final String S3_PATH_SEPARATOR = "/"; public AmazonS3 getS3Client(String accessKey, String secretKey) { AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java new file mode 100644 index 0000000..20a4f1f --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.output.spool.LogSpooler; +import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class OutputS3FileTest { + + private Map configMap; + + @Before + public void setupConfiguration() { + configMap = new HashMap<>(); + String[] configKeys = new String[] { + S3OutputConfiguration.SPOOL_DIR_KEY, + S3OutputConfiguration.S3_BUCKET_NAME_KEY, + S3OutputConfiguration.S3_LOG_DIR_KEY, + S3OutputConfiguration.S3_ACCESS_KEY, + S3OutputConfiguration.S3_SECRET_KEY, + S3OutputConfiguration.COMPRESSION_ALGO_KEY, + S3OutputConfiguration.ADDITIONAL_FIELDS_KEY + }; + Map additionalKeys = new HashMap<>(); + additionalKeys.put(S3OutputConfiguration.CLUSTER_KEY, "cl1"); + Object[] configValues = new Object[] { + "/var/ambari-logsearch/logfeeder", + "s3_bucket_name", + "logs", + "ABCDEFGHIJ1234", + "amdfbldkfdlf", + "gz", + additionalKeys + }; + for (int i = 0; i < configKeys.length; i++) { + configMap.put(configKeys[i], configValues[i]); + } + } + + @Test + public void shouldSpoolLogEventToNewSpooler() throws Exception { + + InputMarker inputMarker = mock(InputMarker.class); + Input input = mock(Input.class); + inputMarker.input = input; + expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log"); + expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode"); + final LogSpooler spooler = mock(LogSpooler.class); + spooler.add("log event block"); + final S3Uploader s3Uploader = mock(S3Uploader.class); + replay(input, inputMarker, spooler, s3Uploader); + + OutputS3File outputS3File = new OutputS3File() { + @Override + protected LogSpooler createSpooler(String filePath) { + return spooler; + } + + @Override + protected S3Uploader createUploader(String logType) { + return s3Uploader; + } + }; + outputS3File.loadConfig(configMap); + outputS3File.init(); + outputS3File.write("log event block", inputMarker); + verify(spooler); + } + + @Test + public void shouldReuseSpoolerForSamePath() throws Exception { + InputMarker inputMarker = mock(InputMarker.class); + Input input = mock(Input.class); + inputMarker.input = input; + expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log"); + expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode"); + final LogSpooler spooler = mock(LogSpooler.class); + spooler.add("log event block1"); + spooler.add("log event block2"); + final S3Uploader s3Uploader = mock(S3Uploader.class); + replay(input, inputMarker, spooler, s3Uploader); + + OutputS3File outputS3File = new OutputS3File() { + private boolean firstCallComplete; + @Override + protected LogSpooler createSpooler(String filePath) { + if (!firstCallComplete) { + firstCallComplete = true; + return spooler; + } + throw new IllegalStateException("Shouldn't call createSpooler for same path."); + } + + @Override + protected S3Uploader createUploader(String logType) { + return s3Uploader; + } + }; + outputS3File.loadConfig(configMap); + outputS3File.init(); + outputS3File.write("log event block1", inputMarker); + outputS3File.write("log event block2", inputMarker); + verify(spooler); + } + + @Test + public void shouldRolloverWhenSufficientSizeIsReached() throws Exception { + + String thresholdSize = Long.toString(15 * 1024 * 1024L); + LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class); + File activeSpoolFile = mock(File.class); + expect(activeSpoolFile.length()).andReturn(20*1024*1024L); + expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile); + replay(logSpoolerContext, activeSpoolFile); + + OutputS3File outputS3File = new OutputS3File(); + configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize); + outputS3File.loadConfig(configMap); + outputS3File.init(); + + assertTrue(outputS3File.shouldRollover(logSpoolerContext)); + } + + @Test + public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception { + String thresholdSize = Long.toString(15 * 1024 * 1024L); + LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class); + File activeSpoolFile = mock(File.class); + expect(activeSpoolFile.length()).andReturn(10*1024*1024L); + expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile); + replay(logSpoolerContext, activeSpoolFile); + + OutputS3File outputS3File = new OutputS3File(); + configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize); + outputS3File.loadConfig(configMap); + outputS3File.init(); + + assertFalse(outputS3File.shouldRollover(logSpoolerContext)); + } + + @Test + public void shouldUploadFileOnRollover() throws Exception { + InputMarker inputMarker = mock(InputMarker.class); + Input input = mock(Input.class); + inputMarker.input = input; + expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log"); + expect(input.getStringValue(OutputS3File.INPUT_ATTRIBUTE_TYPE)).andReturn("hdfs-namenode"); + final LogSpooler spooler = mock(LogSpooler.class); + spooler.add("log event block1"); + final S3Uploader s3Uploader = mock(S3Uploader.class); + s3Uploader.addFileForUpload("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz"); + replay(input, inputMarker, spooler, s3Uploader); + + OutputS3File outputS3File = new OutputS3File() { + @Override + protected LogSpooler createSpooler(String filePath) { + return spooler; + } + @Override + protected S3Uploader createUploader(String logType) { + return s3Uploader; + } + }; + outputS3File.write("log event block1", inputMarker); + outputS3File.handleRollover(new File("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz")); + + verify(s3Uploader); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java new file mode 100644 index 0000000..49cee56 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import org.apache.ambari.logfeeder.LogFeederUtil; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class S3LogPathResolverTest { + + @Test + public void shouldResolveHostName() { + String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$host", "filename.log", "cl1"); + assertEquals("my_s3_path/" + LogFeederUtil.hostName + "/filename.log", resolvedPath); + } + + @Test + public void shouldResolveIpAddress() { + String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$ip", "filename.log", "cl1"); + assertEquals("my_s3_path/" + LogFeederUtil.ipAddress + "/filename.log", resolvedPath); + } + + @Test + public void shouldResolveCluster() { + String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster", "filename.log", "cl1"); + assertEquals("my_s3_path/cl1/filename.log", resolvedPath); + } + + @Test + public void shouldResolveCombinations() { + String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster/$host", "filename.log", "cl1"); + assertEquals("my_s3_path/cl1/"+ LogFeederUtil.hostName + "/filename.log", resolvedPath); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java new file mode 100644 index 0000000..a0c398e --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.output; + +import org.apache.ambari.logfeeder.s3.S3Util; +import org.junit.Test; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static org.easymock.EasyMock.*; +import static org.junit.Assert.assertEquals; + +public class S3UploaderTest { + + public static final String TEST_BUCKET = "test_bucket"; + public static final String TEST_PATH = "test_path"; + public static final String GZ = "gz"; + public static final String LOG_TYPE = "hdfs_namenode"; + public static final String ACCESS_KEY_VALUE = "accessKeyValue"; + public static final String SECRET_KEY_VALUE = "secretKeyValue"; + + @Test + public void shouldUploadToS3ToRightBucket() { + File fileToUpload = mock(File.class); + String fileName = "hdfs_namenode.log.123343493473948"; + expect(fileToUpload.getName()).andReturn(fileName); + final File compressedFile = mock(File.class); + Map configs = setupS3Configs(); + + S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs); + S3Util s3Util = mock(S3Util.class); + String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ); + s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE); + expect(compressedFile.delete()).andReturn(true); + expect(fileToUpload.delete()).andReturn(true); + replay(fileToUpload, compressedFile, s3Util); + + S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) { + @Override + protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { + return compressedFile; + } + }; + String resolvedPath = s3Uploader.uploadFile(fileToUpload, LOG_TYPE); + + verify(s3Util); + assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath); + } + + @Test + public void shouldCleanupLocalFilesOnSuccessfulUpload() { + File fileToUpload = mock(File.class); + String fileName = "hdfs_namenode.log.123343493473948"; + expect(fileToUpload.getName()).andReturn(fileName); + final File compressedFile = mock(File.class); + Map configs = setupS3Configs(); + + S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs); + S3Util s3Util = mock(S3Util.class); + String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ); + s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE); + expect(compressedFile.delete()).andReturn(true); + expect(fileToUpload.delete()).andReturn(true); + replay(fileToUpload, compressedFile, s3Util); + + S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) { + @Override + protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { + return compressedFile; + } + }; + s3Uploader.uploadFile(fileToUpload, LOG_TYPE); + + verify(fileToUpload); + verify(compressedFile); + } + + @Test + public void shouldNotCleanupUncompressedFileIfNotRequired() { + File fileToUpload = mock(File.class); + String fileName = "hdfs_namenode.log.123343493473948"; + expect(fileToUpload.getName()).andReturn(fileName); + final File compressedFile = mock(File.class); + Map configs = setupS3Configs(); + + S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs); + S3Util s3Util = mock(S3Util.class); + String s3Key = String.format("%s/%s/%s.%s", TEST_PATH, LOG_TYPE, fileName, GZ); + s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE); + expect(compressedFile.delete()).andReturn(true); + replay(fileToUpload, compressedFile, s3Util); + + S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, false, LOG_TYPE) { + @Override + protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { + return compressedFile; + } + }; + s3Uploader.uploadFile(fileToUpload, LOG_TYPE); + + verify(fileToUpload); + verify(compressedFile); + } + + @Test + public void shouldExpandVariablesInPath() { + File fileToUpload = mock(File.class); + String fileName = "hdfs_namenode.log.123343493473948"; + expect(fileToUpload.getName()).andReturn(fileName); + final File compressedFile = mock(File.class); + Map configs = setupS3Configs(); + configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, "$cluster/"+TEST_PATH); + + + S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs); + S3Util s3Util = mock(S3Util.class); + String s3Key = String.format("%s/%s/%s/%s.%s", "cl1", TEST_PATH, LOG_TYPE, fileName, GZ); + s3Util.uploadFileTos3(TEST_BUCKET, s3Key, compressedFile, ACCESS_KEY_VALUE, SECRET_KEY_VALUE); + expect(compressedFile.delete()).andReturn(true); + expect(fileToUpload.delete()).andReturn(true); + replay(fileToUpload, compressedFile, s3Util); + + S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, s3Util, true, LOG_TYPE) { + @Override + protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) { + return compressedFile; + } + }; + s3Uploader.uploadFile(fileToUpload, LOG_TYPE); + + verify(s3Util); + } + + private Map setupS3Configs() { + Map configs = new HashMap<>(); + configs.put(S3OutputConfiguration.S3_BUCKET_NAME_KEY, TEST_BUCKET); + configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, TEST_PATH); + configs.put(S3OutputConfiguration.S3_ACCESS_KEY, ACCESS_KEY_VALUE); + configs.put(S3OutputConfiguration.S3_SECRET_KEY, SECRET_KEY_VALUE); + configs.put(S3OutputConfiguration.COMPRESSION_ALGO_KEY, GZ); + Map nameValueMap = new HashMap<>(); + nameValueMap.put(S3OutputConfiguration.CLUSTER_KEY, "cl1"); + configs.put(S3OutputConfiguration.ADDITIONAL_FIELDS_KEY, nameValueMap); + return configs; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/6dcf6381/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java index 7d9d78a..7a47039 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java @@ -43,7 +43,6 @@ public class LogSpoolerTest { private String spoolDirectory; private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log"; - private static final String FILE_SUFFIX = "currentFile"; @Mock private RolloverCondition rolloverCondition; @@ -61,13 +60,13 @@ public class LogSpoolerTest { final PrintWriter spoolWriter = mock(PrintWriter.class); spoolWriter.println("log event"); - final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); - LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + final File mockFile = setupInputFileExpectations(); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile); expect(rolloverCondition.shouldRollover( cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))). andReturn(false); - replay(spoolWriter, rolloverCondition); + replay(spoolWriter, rolloverCondition, mockFile); LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, rolloverCondition, rolloverHandler) { @@ -77,8 +76,8 @@ public class LogSpoolerTest { } @Override - protected String getCurrentFileName() { - return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + protected File initializeSpoolFile() { + return mockFile; } }; logSpooler.add("log event"); @@ -86,20 +85,26 @@ public class LogSpoolerTest { verify(spoolWriter); } + private File setupInputFileExpectations() { + final File mockFile = mock(File.class); + expect(mockFile.length()).andReturn(10240L); + return mockFile; + } + @Test public void shouldIncrementSpooledEventsCount() { final PrintWriter spoolWriter = mock(PrintWriter.class); spoolWriter.println("log event"); - final File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); - LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + final File mockFile = setupInputFileExpectations(); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile); logSpoolerContext.logEventSpooled(); expect(rolloverCondition.shouldRollover( cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))). andReturn(false); - replay(spoolWriter, rolloverCondition); + replay(spoolWriter, rolloverCondition, mockFile); LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, rolloverCondition, rolloverHandler) { @@ -109,8 +114,8 @@ public class LogSpoolerTest { } @Override - protected String getCurrentFileName() { - return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + protected File initializeSpoolFile() { + return mockFile; } }; logSpooler.add("log event"); @@ -125,14 +130,14 @@ public class LogSpoolerTest { spoolWriter.flush(); spoolWriter.close(); - File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); - LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + final File mockFile = setupInputFileExpectations(); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile); expect(rolloverCondition.shouldRollover( cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))). andReturn(true); - rolloverHandler.handleRollover(spoolFile); + rolloverHandler.handleRollover(mockFile); - replay(spoolWriter, rolloverCondition, rolloverHandler); + replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile); LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, rolloverCondition, rolloverHandler) { @@ -143,8 +148,8 @@ public class LogSpoolerTest { } @Override - protected String getCurrentFileName() { - return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + protected File initializeSpoolFile() { + return mockFile; } }; logSpooler.add("log event"); @@ -161,22 +166,22 @@ public class LogSpoolerTest { spoolWriter1.flush(); spoolWriter1.close(); - File spoolFile1 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1"); - File spoolFile2 = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2"); + final File mockFile1 = setupInputFileExpectations(); + final File mockFile2 = setupInputFileExpectations(); - LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(spoolFile1); + LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1); expect(rolloverCondition.shouldRollover( cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) ).andReturn(true); - LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(spoolFile2); + LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2); expect(rolloverCondition.shouldRollover( cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) ).andReturn(false); - rolloverHandler.handleRollover(spoolFile1); + rolloverHandler.handleRollover(mockFile1); - replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler); + replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2); LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, rolloverCondition, rolloverHandler) { @@ -193,11 +198,11 @@ public class LogSpoolerTest { } @Override - protected String getCurrentFileName() { + protected File initializeSpoolFile() { if (!wasRolledOver) { - return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_1"; + return mockFile1; } else { - return SOURCE_FILENAME_PREFIX + FILE_SUFFIX + "_2"; + return mockFile2; } } }; @@ -214,14 +219,14 @@ public class LogSpoolerTest { spoolWriter.flush(); spoolWriter.close(); - File spoolFile = new File(spoolDirectory, SOURCE_FILENAME_PREFIX + FILE_SUFFIX); - LogSpoolerContext logSpoolerContext = new LogSpoolerContext(spoolFile); + final File mockFile = setupInputFileExpectations(); + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile); expect(rolloverCondition.shouldRollover( cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) ).andReturn(true); - rolloverHandler.handleRollover(spoolFile); + rolloverHandler.handleRollover(mockFile); - replay(spoolWriter, rolloverCondition, rolloverHandler); + replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile); LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, rolloverCondition, rolloverHandler) { @@ -232,8 +237,8 @@ public class LogSpoolerTest { } @Override - protected String getCurrentFileName() { - return SOURCE_FILENAME_PREFIX + FILE_SUFFIX; + protected File initializeSpoolFile() { + return mockFile; } }; logSpooler.add("log event"); @@ -241,10 +246,121 @@ public class LogSpoolerTest { verify(rolloverHandler); } + // Rollover twice - the second rollover should work if the "rolloverInProgress" + // flag is being reset correctly. Third file expectations being setup due + // to auto-initialization. + @Test + public void shouldResetRolloverInProgressFlag() { + final PrintWriter spoolWriter1 = mock(PrintWriter.class); + final PrintWriter spoolWriter2 = mock(PrintWriter.class); + final PrintWriter spoolWriter3 = mock(PrintWriter.class); + spoolWriter1.println("log event1"); + spoolWriter2.println("log event2"); + spoolWriter1.flush(); + spoolWriter1.close(); + spoolWriter2.flush(); + spoolWriter2.close(); + + final File mockFile1 = setupInputFileExpectations(); + final File mockFile2 = setupInputFileExpectations(); + final File mockFile3 = setupInputFileExpectations(); + + LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) + ).andReturn(true); + + LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL)) + ).andReturn(true); + + rolloverHandler.handleRollover(mockFile1); + rolloverHandler.handleRollover(mockFile2); + + replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2, mockFile3); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + private int currentFileNum; + + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + PrintWriter spoolWriter = null; + switch (currentFileNum) { + case 0: + spoolWriter = spoolWriter1; + break; + case 1: + spoolWriter = spoolWriter2; + break; + case 2: + spoolWriter = spoolWriter3; + break; + } + currentFileNum++; + return spoolWriter; + } + + @Override + protected File initializeSpoolFile() { + switch (currentFileNum) { + case 0: + return mockFile1; + case 1: + return mockFile2; + case 2: + return mockFile3; + default: + return null; + } + } + }; + logSpooler.add("log event1"); + logSpooler.add("log event2"); + + verify(spoolWriter1, spoolWriter2, rolloverCondition); + } + + @Test + public void shouldNotRolloverZeroLengthFiles() { + final PrintWriter spoolWriter = mock(PrintWriter.class); + spoolWriter.println("log event"); + spoolWriter.flush(); + spoolWriter.close(); + + final File mockFile = mock(File.class); + expect(mockFile.length()).andReturn(0L); + + LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile); + expect(rolloverCondition.shouldRollover( + cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))). + andReturn(true); + + replay(spoolWriter, rolloverCondition, mockFile); + + LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX, + rolloverCondition, rolloverHandler) { + + @Override + protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException { + return spoolWriter; + } + + @Override + protected File initializeSpoolFile() { + return mockFile; + } + }; + logSpooler.add("log event"); + + verify(mockFile); + } + class LogSpoolerFileComparator implements Comparator { @Override public int compare(LogSpoolerContext o1, LogSpoolerContext o2) { - return o1.getActiveSpoolFile().compareTo(o2.getActiveSpoolFile()); + return o1.getActiveSpoolFile()==o2.getActiveSpoolFile() ? 0 : -1; } }