ambari-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oleew...@apache.org
Subject ambari git commit: AMBARI-17046. Support loading logs to HDFS (Hayat Behlim via oleewere)
Date Mon, 04 Jul 2016 09:56:40 GMT
Repository: ambari
Updated Branches:
  refs/heads/trunk d6abf7377 -> 196657bf3


AMBARI-17046. Support loading logs to HDFS (Hayat Behlim via oleewere)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/196657bf
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/196657bf
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/196657bf

Branch: refs/heads/trunk
Commit: 196657bf37bfd462df11d092d831fbd975fdc777
Parents: d6abf73
Author: oleewere <oleewere@gmail.com>
Authored: Mon Jul 4 11:50:06 2016 +0200
Committer: oleewere <oleewere@gmail.com>
Committed: Mon Jul 4 11:50:06 2016 +0200

----------------------------------------------------------------------
 .../ambari-logsearch-logfeeder/pom.xml          |  15 +
 .../apache/ambari/logfeeder/ConfigBlock.java    |  14 +
 .../org/apache/ambari/logfeeder/LogFeeder.java  |   9 +-
 .../ambari/logfeeder/output/OutputHDFSFile.java | 273 +++++++++++++++++++
 .../apache/ambari/logfeeder/util/DateUtil.java  |  47 ++++
 .../logfeeder/util/LogfeederHDFSUtil.java       |  95 +++++++
 .../src/main/resources/alias_config.json        |   5 +-
 .../samples/config/output-hdfs-config.json      |  20 ++
 ambari-logsearch/pom.xml                        |   2 +
 9 files changed, 477 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index f450c53..25e4306 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -139,6 +139,21 @@
     <artifactId>aws-java-sdk-iam</artifactId>
     <version>1.11.5</version>
   </dependency>
+   <dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-common</artifactId>
+    <version>${hadoop.version}</version>
+  </dependency>
+  <dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-hdfs</artifactId>
+    <version>${hadoop.version}</version>
+  </dependency>
+  <dependency>
+    <groupId>commons-io</groupId>
+    <artifactId>commons-io</artifactId>
+    <version>${common.io.version}</version>
+  </dependency>
  </dependencies>
   <build>
     <finalName>LogFeeder</finalName>

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
index 088472e..19bb191 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
@@ -204,6 +204,20 @@ public abstract class ConfigBlock {
     }
     return retValue;
   }
+  
+  public long getLongValue(String key, long defaultValue) {
+    String strValue = getStringValue(key);
+    Long retValue = defaultValue;
+    if (!StringUtils.isEmpty(strValue)) {
+      try {
+        retValue = Long.parseLong(strValue);
+      } catch (Throwable t) {
+        logger.error("Error parsing long value. key=" + key + ", value="
+            + strValue);
+      }
+    }
+    return retValue;
+  }
 
   public Map<String, String> getContextFields() {
     return contextFields;

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index c5d4fd5..b0c43bb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -43,6 +43,7 @@ import org.apache.ambari.logfeeder.input.Input;
 import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
 import org.apache.ambari.logfeeder.output.Output;
 import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -52,6 +53,8 @@ public class LogFeeder {
   static Logger logger = Logger.getLogger(LogFeeder.class);
 
   Collection<Output> outputList = new ArrayList<Output>();
+  
+  private static final int LOGFEEDER_SHUTDOWN_HOOK_PRIORITY = 30;
 
   OutputMgr outMgr = new OutputMgr();
   InputMgr inputMgr = new InputMgr();
@@ -448,8 +451,10 @@ public class LogFeeder {
 
   private void monitor() throws Exception {
     inputMgr.monitor();
-    Runtime.getRuntime().addShutdownHook(new JVMShutdownHook());
-
+    JVMShutdownHook logfeederJVMHook = new JVMShutdownHook();
+    ShutdownHookManager.get().addShutdownHook(logfeederJVMHook,
+        LOGFEEDER_SHUTDOWN_HOOK_PRIORITY);
+    
     statLoggerThread = new Thread("statLogger") {
 
       @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
new file mode 100644
index 0000000..9272636
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.DateUtil;
+import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+
+public class OutputHDFSFile extends Output {
+  private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
+
+  private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
+
+  private final Object readyMonitor = new Object();
+
+  private Thread hdfsCopyThread = null;
+
+  private PrintWriter outWriter = null;
+  // local writer variables
+  private String localFilePath = null;
+  private String filenamePrefix = "service-logs-";
+  private String localFileDir = null;
+  private File localcurrentFile = null;
+  private Date localFileCreateTime = null;
+  private long localFileRolloverSec = 5 * 1 * 60;// 5 min by default
+
+  private String hdfsOutDir = null;
+  private String hdfsHost = null;
+  private String hdfsPort = null;
+  private FileSystem fileSystem = null;
+
+  private String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+
+  @Override
+  public void init() throws Exception {
+    super.init();
+    hdfsOutDir = getStringValue("hdfs_out_dir");
+    hdfsHost = getStringValue("hdfs_host");
+    hdfsPort = getStringValue("hdfs_port");
+    localFileRolloverSec = getLongValue("rollover_sec", localFileRolloverSec);
+    filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
+    if (hdfsOutDir == null || hdfsOutDir.isEmpty()) {
+      logger
+          .error("Filepath config property <path> is not set in config file.");
+      return;
+    }
+    HashMap<String, String> contextParam = buildContextParam();
+    hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
+    logger.info("hdfs Output dir=" + hdfsOutDir);
+    localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
+    localFilePath = localFileDir;
+    this.startHDFSCopyThread();
+  }
+
+  @Override
+  public void close() {
+    logger.info("Closing file." + getShortDescription());
+    if (outWriter != null) {
+      try {
+        outWriter.flush();
+        outWriter.close();
+        addFileInReadyList(localcurrentFile);
+      } catch (Throwable t) {
+        // Ignore this exception
+      }
+    }
+    this.stopHDFSCopyThread();
+    isClosed = true;
+  }
+
+  @Override
+  synchronized public void write(String block, InputMarker inputMarker)
+      throws Exception {
+    if (block != null) {
+      buildOutWriter();
+      if (outWriter != null) {
+        statMetric.count++;
+        outWriter.println(block);
+        closeFileIfNeeded();
+      }
+    }
+  }
+
+  
+  @Override
+  public String getShortDescription() {
+    return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
+  }
+
+  private synchronized void closeFileIfNeeded() throws FileNotFoundException,
+      IOException {
+    if (outWriter == null) {
+      return;
+    }
+    // TODO: Close the file on absolute time. Currently it is implemented as
+    // relative time
+    if (System.currentTimeMillis() - localFileCreateTime.getTime() > localFileRolloverSec
* 1000) {
+      logger.info("Closing file. Rolling over. name="
+          + localcurrentFile.getName() + ", filePath="
+          + localcurrentFile.getAbsolutePath());
+      try {
+        outWriter.flush();
+        outWriter.close();
+        addFileInReadyList(localcurrentFile);
+      } catch (Throwable t) {
+        logger
+            .error("Error on closing output writter. Exception will be ignored. name="
+                + localcurrentFile.getName()
+                + ", filePath="
+                + localcurrentFile.getAbsolutePath());
+      }
+
+      outWriter = null;
+      localcurrentFile = null;
+    }
+  }
+
+  public synchronized void buildOutWriter() {
+    if (outWriter == null) {
+      String currentFilePath = localFilePath + getCurrentFileName();
+      localcurrentFile = new File(currentFilePath);
+      if (localcurrentFile.getParentFile() != null) {
+        File parentDir = localcurrentFile.getParentFile();
+        if (!parentDir.isDirectory()) {
+          parentDir.mkdirs();
+        }
+      }
+      try {
+        outWriter = new PrintWriter(new BufferedWriter(new FileWriter(
+            localcurrentFile, true)));
+      } catch (IOException e) {
+        logger.error("= OutputHDFSFile.buidOutWriter failed for file :  "
+            + localcurrentFile.getAbsolutePath() + " Desc: "
+            + getShortDescription() + " errorMsg: " + e.getLocalizedMessage(),
+            e);
+      }
+      localFileCreateTime = new Date();
+      logger.info("Create file is successful. localFilePath="
+          + localcurrentFile.getAbsolutePath());
+    }
+  }
+
+  private void startHDFSCopyThread() {
+
+    hdfsCopyThread = new Thread("hdfsCopyThread") {
+      @Override
+      public void run() {
+        try {
+          while (true) {
+            Iterator<File> localFileIterator = localReadyFiles.iterator();
+            while (localFileIterator.hasNext()) {
+              File localFile = localFileIterator.next();
+              fileSystem = LogfeederHDFSUtil.INSTANCE.buildFileSystem(hdfsHost,
+                  hdfsPort);
+              if (fileSystem != null && localFile.exists()) {
+                String destFilePath = hdfsOutDir + "/" + localFile.getName();
+                String localPath = localFile.getAbsolutePath();
+                boolean overWrite = true;
+                boolean delSrc = true;
+                boolean isCopied = LogfeederHDFSUtil.INSTANCE.copyFromLocal(
+                    localFile.getAbsolutePath(), destFilePath, fileSystem,
+                    overWrite, delSrc);
+                if (isCopied) {
+                  logger.debug("File copy to hdfs hdfspath :" + destFilePath
+                      + " and deleted local file :" + localPath);
+                } else {
+                  // TODO Need to write retry logic, in next release we can
+                  // handle it
+                  logger.error("Hdfs file copy  failed for hdfspath :"
+                      + destFilePath + " and localpath :" + localPath);
+                }
+
+              }
+
+            }
+            try {
+              // wait till new file comes in reayList
+              synchronized (readyMonitor) {
+                if (localReadyFiles.size() == 0) {
+                  readyMonitor.wait();
+                }
+              }
+            } catch (InterruptedException e) {
+              // ignore
+            }
+          }
+        } catch (Exception e) {
+          logger
+              .error(
+                  "Exception in hdfsCopyThread errorMsg:"
+                      + e.getLocalizedMessage(), e);
+        }
+      }
+    };
+    hdfsCopyThread.setDaemon(true);
+    hdfsCopyThread.start();
+  }
+
+  private void stopHDFSCopyThread() {
+    if (hdfsCopyThread != null) {
+      logger.info("waiting till copy all local files to hdfs.......");
+      while (localReadyFiles.size() != 0) {
+
+      }
+      logger.info("calling interrupt method for hdfsCopyThread to stop it.");
+      hdfsCopyThread.interrupt();
+      LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem);
+    }
+  }
+
+  public String getCurrentFileName() {
+    Date currentDate = new Date();
+    String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
+    String fileName = filenamePrefix + dateStr;
+    return fileName;
+  }
+
+  public HashMap<String, String> buildContextParam() {
+    HashMap<String, String> contextParam = new HashMap<String, String>();
+    contextParam.put("host", LogFeederUtil.hostName);
+    return contextParam;
+  }
+
+  public void addFileInReadyList(File localFile) {
+    localReadyFiles.add(localFile);
+    try {
+      synchronized (readyMonitor) {
+        readyMonitor.notifyAll();
+      }
+    } catch (Exception exception) {
+      // ignore
+    }
+  }
+
+  @Override
+  public void copyFile(File inputFile, InputMarker inputMarker)
+      throws UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "copyFile method is not yet supported for output=hdfs");     
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
new file mode 100644
index 0000000..1c0ce67
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.log4j.Logger;
+
+public class DateUtil {
+  private static final Logger logger = Logger.getLogger(DateUtil.class);
+
+  public static String dateToString(Date date, String dateFormat) {
+    if (date == null || dateFormat == null || dateFormat.isEmpty()) {
+      return "";
+    }
+    try {
+      SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
+      return formatter.format(date).toString();
+    } catch (Exception e) {
+      logger.error("Error in coverting dateToString  format :" + dateFormat, e);
+    }
+    return "";
+  }
+
+  public static void main(String[] args) {
+    Date currentDate = new Date();
+    String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
+    System.out.println(dateToString(currentDate, fileDateFormat));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
new file mode 100644
index 0000000..fd96f8a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+public enum LogfeederHDFSUtil {
+  INSTANCE;
+  private static Logger logger = Logger.getLogger(LogfeederHDFSUtil.class);
+
+  public void createHDFSDir(String dirPath, FileSystem dfs) {
+    Path src = new Path(dirPath);
+    try {
+      if (dfs.isDirectory(src)) {
+        logger.info("hdfs dir dirPath=" + dirPath + "  is already exist.");
+        return;
+      }
+      boolean isDirCreated = dfs.mkdirs(src);
+      if (isDirCreated) {
+        logger.debug("HDFS dirPath=" + dirPath + " created successfully.");
+      } else {
+        logger.warn("HDFS dir creation failed dirPath=" + dirPath);
+      }
+    } catch (IOException e) {
+      logger.error("HDFS dir creation failed dirPath=" + dirPath, e.getCause());
+    }
+  }
+
+  public boolean copyFromLocal(String sourceFilepath, String destFilePath,
+      FileSystem fileSystem, boolean overwrite, boolean delSrc) {
+    Path src = new Path(sourceFilepath);
+    Path dst = new Path(destFilePath);
+    boolean isCopied = false;
+    try {
+      logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := "
+          + destFilePath);
+      fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
+      isCopied = true;
+    } catch (Exception e) {
+      logger.error("Error copying local file :" + sourceFilepath
+          + " to hdfs location : " + destFilePath, e);
+    }
+    return isCopied;
+  }
+
+  public FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+    try {
+      Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
+      FileSystem fs = FileSystem.get(configuration);
+      return fs;
+    } catch (Exception e) {
+      logger.error("Exception is buildFileSystem :", e);
+    }
+    return null;
+  }
+
+  public void closeFileSystem(FileSystem fileSystem) {
+    if (fileSystem != null) {
+      try {
+        fileSystem.close();
+      } catch (IOException e) {
+        logger.error(e.getLocalizedMessage(), e.getCause());
+      }
+    }
+  }
+
+  public Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
+    String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+    Configuration configuration = new Configuration();
+    configuration.set("fs.default.name", url);
+    return configuration;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 58bcdae..978f581 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -46,6 +46,9 @@
 		},
 		"s3_file": {
 			"klass": "org.apache.ambari.logfeeder.output.OutputS3File"
- 		}
+ 		},
+ 		"hdfs": {
+			"klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile"
+  		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json
b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json
new file mode 100644
index 0000000..336934a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/samples/config/output-hdfs-config.json
@@ -0,0 +1,20 @@
+{
+
+"output": [{
+		"comment": "Write log to hdfs",
+		"destination": "hdfs",
+		"hdfs_out_dir": "logfeeder/$HOST/service",
+		"file_name_prefix":"service-logs-",
+		"hdfs_host": "hdfs_host",
+		"hdfs_port": "8020",
+		"rollover_sec":"300",
+		"conditions": {
+			"fields": {
+				"rowtype": [
+					"service"
+				]
+			}
+		}
+	}]
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/196657bf/ambari-logsearch/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/pom.xml b/ambari-logsearch/pom.xml
index 414fd8f..81da0f2 100644
--- a/ambari-logsearch/pom.xml
+++ b/ambari-logsearch/pom.xml
@@ -42,6 +42,8 @@
     <deb.architecture>amd64</deb.architecture>
     <deb.dependency.list>${deb.python.ver}</deb.dependency.list>
     <solr.version>5.5.2</solr.version>
+    <hadoop.version>2.7.2</hadoop.version>
+    <common.io.version>2.5</common.io.version>
   </properties>
   <build>
     <plugins>


Mime
View raw message