flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khadar basha <khadar...@gmail.com>
Subject Re: Log4JAppender Ignoring the Logging Pattern. Getting only description.
Date Fri, 10 Aug 2012 05:46:29 GMT
Hi All,

Finally i am able to send the logs in log4j format. I have modified the
Log4jAppender.java  to include the formatted message into FlumeEvent's
body.
I verified through programaically.Its working fine. It is Working fine with
log4j.xml.  by using log4j.properties it is not taking the format. Not sure
anything need to be done.

Modified Log4jAppender.java
=====================
package org.apache.flume.clients.log4jappender;

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Layout;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;

/**
 *
 * Appends Log4j Events to an external Flume client which is decribed by
 * the Log4j configuration file. The appender takes two required parameters:
 *<p>
 *<strong>Hostname</strong> : This is the hostname of the first hop
 *at which Flume (through an AvroSource) is listening for events.
 *</p>
 *<p>
 *<strong>Port</strong> : This the port on the above host where the Flume
 *Source is listening for events.
 *</p>
 *A sample log4j properties file which appends to a source would look like:
 *<pre><p>
 *log4j.appender.out2 = org.apache.flume.clients.log4jappender.Log4jAppender
 *log4j.appender.out2.Port = 25430
 *log4j.appender.out2.Hostname = foobarflumesource.com
 *log4j.logger.org.apache.flume.clients.log4jappender = DEBUG,out2</p></pre>
 *<p><i>Note: Change the last line to the package of the class(es), that
will
 *do the appending.For example if classes from the package
 *com.bar.foo are appending, the last line would be:</i></p>
 *<pre><p>log4j.logger.com.bar.foo = DEBUG,out2</p></pre>
 *
 *
 */
public class Log4jAppender extends AppenderSkeleton {

  private String hostname;
  private int port;
  private RpcClient rpcClient = null;



  /**
   * If this constructor is used programmatically rather than from a log4j
conf
   * you must set the <tt>port</tt> and <tt>hostname</tt> and then
call
   * <tt>activateOptions()</tt> before calling <tt>append()</tt>.
   */
  public Log4jAppender(){
  super();
  System.out.println("Inside Constructor"+layout);
  }

  /**
   * Sets the hostname and port. Even if these are passed the
   * <tt>activateOptions()</tt> function must be called before calling
   * <tt>append()</tt>, else <tt>append()</tt> will throw an Exception.
   * @param hostname The first hop where the client should connect to.
   * @param port The port to connect on the host.
   *
   */
  public Log4jAppender(String hostname, int port){
    this.hostname = hostname;
    this.port = port;
    System.out.println("Inside Constructor from "+layout);
  }


  public Log4jAppender(String hostname, int port, Layout layout){
    this.hostname = hostname;
    this.port = port;
    this.layout = layout;
 }

  /**
   * Append the LoggingEvent, to send to the first Flume hop.
   * @param event The LoggingEvent to be appended to the flume.
   * @throws FlumeException if the appender was closed,
   * or the hostname and port were not setup, there was a timeout, or there
   * was a connection error.
   */
  @Override
  public synchronized void append(LoggingEvent event) throws FlumeException{
    //If rpcClient is null, it means either this appender object was never
    //setup by setting hostname and port and then calling activateOptions
    //or this appender object was closed by calling close(), so we throw an
    //exception to show the appender is no longer accessible.
    if(rpcClient == null){
      throw new FlumeException("Cannot Append to Appender!" +
          "Appender either closed or not setup correctly!");
    }

    if(!rpcClient.isActive()){
      reconnect();
    }

    //Client created first time append is called.
    Map<String, String> hdrs = new HashMap<String, String>();
    hdrs.put(Log4jAvroHeaders.LOGGER_NAME.toString(),
event.getLoggerName());
    hdrs.put(Log4jAvroHeaders.TIMESTAMP.toString(),
        String.valueOf(event.getTimeStamp()));

    //To get the level back simply use
    //LoggerEvent.toLevel(hdrs.get(Integer.parseInt(
    //Log4jAvroHeaders.LOG_LEVEL.toString()))
    hdrs.put(Log4jAvroHeaders.LOG_LEVEL.toString(),
        String.valueOf(event.getLevel().toInt()));
    hdrs.put(Log4jAvroHeaders.MESSAGE_ENCODING.toString(), "UTF8");

    StringBuilder body = null;
    if(layout != null){

    body = new StringBuilder(layout.format(event));

        if(layout.ignoresThrowable()){
          String[] s = event.getThrowableStrRep();

          if (s != null) {

            int len = s.length;

            for (int i = 0; i < len; i++) {
              body.append(s[i]);
              body.append('\n');
            }
            body.setLength(body.length()-1);
          }

        }

    }

    if(layout == null) System.out.println("====== Layout is NULL ======");

    Event flumeEvent = EventBuilder.withBody((layout == null) ?
event.getMessage().toString(): body.toString(),
        Charset.forName("UTF8"), hdrs);

    try {
      rpcClient.append(flumeEvent);
    } catch (EventDeliveryException e) {
      String msg = "Flume append() failed.";
      LogLog.error(msg);
      throw new FlumeException(msg + " Exception follows.", e);
    }
  }

  //This function should be synchronized to make sure one thread
  //does not close an appender another thread is using, and hence risking
  //a null pointer exception.
  /**
   * Closes underlying client.
   * If <tt>append()</tt> is called after this function is called,
   * it will throw an exception.
   * @throws FlumeException if errors occur during close
   */
  @Override
  public synchronized void close() throws FlumeException{
    //Any append calls after this will result in an Exception.
    if (rpcClient != null) {
      rpcClient.close();
      rpcClient = null;
    }
  }

  @Override
  public boolean requiresLayout() {
    return false;
  }

  /**
   * Set the first flume hop hostname.
   * @param hostname The first hop where the client should connect to.
   */
  public void setHostname(String hostname){
    this.hostname = hostname;
  }

  /**
   * Set the port on the hostname to connect to.
   * @param port The port to connect on the host.
   */
  public void setPort(int port){
    this.port = port;
  }

  /**
   * Activate the options set using <tt>setPort()</tt>
   * and <tt>setHostname()</tt>
   * @throws FlumeException if the <tt>hostname</tt> and
   *  <tt>port</tt> combination is invalid.
   */
  @Override
  public void activateOptions() throws FlumeException{
    try {
      rpcClient = RpcClientFactory.getDefaultInstance(hostname, port);
    } catch (FlumeException e) {
      String errormsg = "RPC client creation failed! " +
          e.getMessage();
      LogLog.error(errormsg);
      throw e;
    }
  }

  /**
   * Make it easy to reconnect on failure
   * @throws FlumeException
   */
  private void reconnect() throws FlumeException {
    close();
    activateOptions();
  }

}


Program to send the log messages:
==========================
 public void test(){
     try {
            Log4jAppender appender = new Log4jAppender();
                appender.setHostname("flume_agent_host");
                appender.setPort(41414);
                appender.setLayout(new PatternLayout("%d [%c] (%t)
<%X{user} %X{field}> %m"));
             //   appender.setReconnectAttempts(100);

               appender.activateOptions();

                log.addAppender(appender);

                MDC.put("user", "chris");
              //  while (true) {
                    MDC.put("field", UUID.randomUUID().toString());
                    log.info("=====> Hello World");
                    try {
                        throw new Exception("Testing");
                    } catch (Exception e) {
                        log.error("Gone wrong ===>", e);
                    }
                //}
                    System.in.read();
                    System.in.read();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }



Thanks,
Khadar



On Fri, Jul 27, 2012 at 8:55 PM, Ralph Goers <ralph.goers@dslextreme.com>wrote:

> Khadar,
>
> I am not sure if your reply was meant as a response to my comment or just
> as additional information. The Log4j 2 FlumeAppender is not part of Flume.
> See
> http://logging.apache.org/log4j/2.x/manual/appenders.html#FlumeAvroAppender.
> However, Log4j 2 is still waiting for its first release so you will have to
> build it yourself if you want to try it.
>
> Ralph
>
> On Jul 27, 2012, at 6:58 AM, khadar basha wrote:
>
> FlumeLog4jAvroAppender is available as part of 0.94. But its not available
> in 1.2.*Log4jAppender *is from flume-ng-log4jappender-1.2.0.jar
>
> I think it is replace as part of 1.2.
>
> On Fri, Jul 27, 2012 at 6:31 PM, Ralph Goers <rgoers@apache.org> wrote:
>
>> You might consider looking at Log4j 2. It has a Flume Appender that
>> records the whole formatted log message in the body. In addition, it will
>> record the MDC fields as well.
>>
>> Sent from my iPad
>>
>> On Jul 27, 2012, at 5:49 AM, khadar basha <khadarskb@gmail.com> wrote:
>>
>> Hi
>>
>> I am using Flume1.2. Using avro source and hdfs sink. Sending message
>> using the Logger channel from application server. For that i am using the
>> *org.apache.flume.clients.log4jappender.Log4jAppender *in MyApp.
>>
>> But i am getting only body of the message (description). loosing the
>> time, thread, Level information.
>>
>> flume-conf.properties file
>> ==================
>> agent2Test1.sources = seqGenSrc
>> agent2Test1.channels = memoryChannel
>> agent2Test1.sinks = loggerSink
>>
>> # For each one of the sources, the type is defined
>> agent2Test1.sources.seqGenSrc.type = avro
>> agent2Test1.sources.seqGenSrc.bind=localhost
>> agent2Test1.sources.seqGenSrc.port=41414
>>
>> # interceptors for host and date
>> agent2Test1.sources.seqGenSrc.interceptors = time hostInterceptor
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.type =
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader =
>> host
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> agent2Test1.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>> = false
>> agent2Test1.sources.seqGenSrc.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>>
>> # The channel can be defined as follows.
>> agent2Test1.sources.seqGenSrc.channels = memoryChannel
>>
>> # Each sink's type must be defined
>> agent2Test1.sinks.loggerSink.type = hdfs
>> agent2Test1.sinks.loggerSink.hdfs.path =
>> hdfs://hadoopHost:8020/data/%Y/%m/%d/%{host}/Logs
>>
>> agent2Test1.sinks.loggerSink.hdfs.fileType = DataStream
>>
>> #Specify the channel the sink should use
>> agent2Test1.sinks.loggerSink.channel = memoryChannel
>>
>> # Each channel's type is defined.
>> agent2Test1.channels.memoryChannel.type = memory
>>
>> # Other config values specific to each type of channel(sink or source)
>> # can be defined as well
>> # In this case, it specifies the capacity of the memory channel
>> agent2Test1.channels.memoryChannel.capacity = 1000
>>
>>
>>
>> Sample java program to generate the log message:
>> =====================================
>>
>>
>> package com.test;
>>
>>
>> import org.apache.flume.clients.log4jappender.Log4jAppender;
>> import org.apache.log4j.Logger;
>> import org.apache.log4j.MDC;
>> import org.apache.log4j.PatternLayout;
>>
>> import java.util.UUID;
>>
>>
>> public class Main {
>>     static Logger log = Logger.getLogger(Main.class);
>>
>>     public static void main(String[] args) {
>>         try {
>>          Log4jAppender appender = new Log4jAppender();
>>             appender.setHostname("localhost");
>>             appender.setPort(41414);
>>             appender.setLayout(new PatternLayout("%d [%c] (%t) <%X{user}
>> %X{field}> %m"));
>>          //   appender.setReconnectAttempts(100);
>>
>>            appender.activateOptions();
>>
>>             log.addAppender(appender);
>>
>>             MDC.put("user", "chris");
>>           //  while (true) {
>>                 MDC.put("field", UUID.randomUUID().toString());
>>                 log.info("=====> Hello World");
>>                 try {
>>                     throw new Exception("Testing");
>>                 } catch (Exception e) {
>>                     log.error("Gone wrong ===>", e);
>>                 }
>>             //}
>>                 System.in.read();
>>                 System.in.read();
>>         }
>>         catch (Exception e) {
>>             e.printStackTrace();
>>         }
>>     }
>> }
>>
>>
>>
>> I am missing any config here ?
>>
>>
>>
>> --
>> Thanks,
>> Khadar
>>
>>
>
>
> --
> Thanks,
> Khadar
>
>
>


-- 
Thanks,
Khadar

Mime
View raw message