flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JP <jpnaidu...@gmail.com>
Subject Custom Serializer is not working
Date Fri, 03 Aug 2012 03:26:24 GMT
HI,

Im getting errros

2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
started.
2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=>
/ localhost 41414 <http://10.105.39.202:41414>] OPEN
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=>
/ localhost :41414 <http://10.105.39.202:41414>] BOUND: / localhost
:41414<http://10.105.39.202:41414>
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost
:3770 <http://10.77.235.245:3770> => / localhost
:41414<http://10.105.39.202:41414>]
CONNECTED: / localhost :3770 <http://10.77.235.245:3770>
2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs://
localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp
2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
        at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
        at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
        at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
        at
org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
        at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
        ... 13 more
2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs://
localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp
2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
        at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
        at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
        at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
        at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
        at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
        at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
        at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

-----------------------------------------------------------------------------------------------------------------


This is my avro file

{ "type": "record", "name": "LogEvent", "namespace":
"org.apache.flume.serialization",
  "fields": [
    { "name": "srno",  "type": "int" },
    { "name": "severity",  "type": "int" },
    { "name": "timestamp", "type": "long" },
    { "name": "hostname",  "type": "string" },
    { "name": "message",   "type": "string" }
  ]
}

------------------------------------------------------------------------------------------------

This is the LogEvent created using maven-avro and little customized

@SuppressWarnings("all")
public class LogEvent extends SpecificRecordBase implements SpecificRecord {
  public static final Schema _SCHEMA =
Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
  public int srno;
  public String severity;
  public long timestamp;
  public String hostname;
  public String message;

  public Schema getSchema() { return _SCHEMA; }
  public Object get(int _field) {
    switch (_field) {
    case 0: return srno;
    case 1: return severity;
    case 2: return timestamp;
    case 3: return hostname;
    case 4: return message;
    default: throw new AvroRuntimeException("Bad index");
    }
  }

  @SuppressWarnings(value="unchecked")
  public void set(int _field, Object _value) {
    switch (_field) {
    case 0: srno = (Integer)_value; break;
    case 1: severity = (String)_value; break;
    case 2: timestamp = (Long)_value; break;
    case 3: hostname = (String)_value; break;
    case 4: message = (String)_value; break;
    default: throw new AvroRuntimeException("Bad index");
    }
  }

    public void setSrno(int srno) {
        this.srno = srno;
    }
    public void setSeverity(String s) {
        severity = s;
    }
    public String getSeverity() {
        return severity;
    }

    public void setTimestamp(long t) {
        timestamp = t;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setHostname(String h) {
        hostname = h;
    }

    public String getHostname() {
        return hostname;
    }

    public void setMessage(String m) {
        message = m;
    }

    public String getMessage() {
        return message;
    }

@Override
public void put(int field, Object value) {
    // TODO Auto-generated method stub

}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
agent2.sources = seqGenSrc
agent2.channels = memoryChannel
agent2.sinks = loggerSink


agent2.sources.seqGenSrc.type = avro
agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
agent2.sources.seqGenSrc.port=41414

#agent2.sources.seqGenSrc.interceptors = time hostInterceptor
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
org.apache.flume.interceptor.HostInterceptor$Builder
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
= false
#agent2.sources.seqGenSrc.interceptors.time.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder

agent2.channels.memoryChannel.type = memory
agent2.channels.memoryChannel.capacity = 1000000
agent2.channels.memoryChannel.transactionCapacity = 1000000
agent2.channels.memoryChannel.keep-alive = 30

agent2.sources.seqGenSrc.channels = memoryChannel

agent2.sinks.loggerSink.type = hdfs
#agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
agent2.sinks.loggerSink.hdfs.fileType = DataStream
#agent2.sinks.loggerSink.hdfs.writeFormat = Text

agent2.sinks.loggerSink.channel = memoryChannel
#agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.BodyTextEventSerializer
#agent2.sinks.loggerSink.serializer = avro_event
agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.CustomLogAvroEventSerializer
agent2.sinks.loggerSink.serializer.compressionCodec = snappy
#agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
agent2.channels.memoryChannel.type = memory
~


-----------------------------------------------------------------------------------------------------------------
The following is my class

public class CustomLogAvroEventSerializer extends
        AbstractAvroEventSerializer<LogEvent> {

      private static final DateTimeFormatter dateFmt1 =
          DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();

      private static final DateTimeFormatter dateFmt2 =
          DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();


      private static final Logger logger =
          LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);

       private final OutputStream out;
      private final Schema schema;

      public CustomLogAvroEventSerializer(OutputStream out) throws
IOException {
        this.out = out;
        this.schema =new LogEvent().getSchema();;
      }

      @Override
      protected OutputStream getOutputStream() {
        return out;
      }

      @Override
      protected Schema getSchema() {
        return schema;
      }

      // very simple rfc3164 parser
      @Override
      protected LogEvent convert(Event event) {
          LogEvent sle = new LogEvent();

        // Stringify body so it's easy to parse.
        // This is a pretty inefficient way to do it.
        String msg = new String(event.getBody(), Charsets.UTF_8);

        // parser read pointer
        int seek = 0;

        // Check Flume headers to see if we came from SyslogTcp(or UDP)
Source,
        // which at the time of this writing only parses the priority.
        // This is a bit schizophrenic and it should parse all the fields
or none.
        Map<String, String> headers = event.getHeaders();
        boolean fromSyslogSource = false;
        if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
          fromSyslogSource = true;
          int srno = Integer.parseInt(headers.get("srno"));
          sle.setSrno(srno);
        }else{
            sle.setSrno(121);
        }
        if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
          fromSyslogSource = true;
          String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
          sle.setSeverity(severity);
        }

        // assume the message was received raw (maybe via NetcatSource)
        // parse the priority string
        if (!fromSyslogSource) {
          if (msg.charAt(0) == '<') {
            int end = msg.indexOf(">");
            if (end > -1) {
              seek = end + 1;
              String priStr = msg.substring(1, end);
             // int priority = Integer.parseInt(priStr);
             // String severity = priStr;

              sle.setSeverity(priStr);
            }
          }
        }

        // parse the timestamp
        String timestampStr = msg.substring(seek, seek + 15);
        long ts = parseRfc3164Date(timestampStr);
        if (ts != 0) {
          sle.setTimestamp(ts);
          seek += 15 + 1; // space after timestamp
        }

        // parse the hostname
        int nextSpace = msg.indexOf(' ', seek);
        if (nextSpace > -1) {
          String hostname = msg.substring(seek, nextSpace);
          sle.setHostname(hostname);
          seek = nextSpace + 1;
        }

        // everything else is the message
        String actualMessage = msg.substring(seek);
        sle.setMessage(actualMessage);

        logger.debug("Serialized event as: {}", sle);

        return sle;
      }

      private static long parseRfc3164Date(String in) {
            DateTime date = null;
            try {
              date = dateFmt1.parseDateTime(in);
            } catch (IllegalArgumentException e) {
              // ignore the exception, we act based on nullity of date
object
              logger.debug("Date parse failed on ({}), trying single-digit
date", in);
            }

            if (date == null) {
              try {
                date = dateFmt2.parseDateTime(in);
              } catch (IllegalArgumentException e) {
                // ignore the exception, we act based on nullity of date
object
                logger.debug("2nd date parse failed on ({}), unknown date
format", in);
              }
            }

            // hacky stuff to try and deal with boundary cases, i.e. new
year's eve.
            // rfc3164 dates are really dumb.
            // NB: cannot handle replaying of old logs or going back to the
future
            if (date != null) {
              DateTime now = new DateTime();
              int year = now.getYear();
              DateTime corrected = date.withYear(year);

              // flume clock is ahead or there is some latency, and the
year rolled
              if (corrected.isAfter(now) &&
corrected.minusMonths(1).isAfter(now)) {
                corrected = date.withYear(year - 1);
              // flume clock is behind and the year rolled
              } else if (corrected.isBefore(now) &&
corrected.plusMonths(1).isBefore(now)) {
                corrected = date.withYear(year + 1);
              }
              date = corrected;
            }

            if (date == null) {
              return 0;
            }

            return date.getMillis();
          }

      public static class Builder implements EventSerializer.Builder {

            @Override
            public EventSerializer build(Context context, OutputStream out)
{
                CustomLogAvroEventSerializer writer = null;
              try {
                writer = new CustomLogAvroEventSerializer(out);
                writer.configure(context);
              } catch (IOException e) {
                logger.error("Unable to parse schema file. Exception
follows.", e);
              }
              return writer;
            }

    }



}

Please suggest me i need output like this and i want to customize like log4j
-------------------------------------------------------------------------------------------------------------------------------------
172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4


-- 
JP



-- 
JP

Mime
View raw message