chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r1023543 - in /incubator/chukwa/trunk: ./ conf/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/
Date Sun, 17 Oct 2010 18:14:28 GMT
Author: eyang
Date: Sun Oct 17 18:14:27 2010
New Revision: 1023543

URL: http://svn.apache.org/viewvc?rev=1023543&view=rev
Log:
CHUKWA-530. Implemented Syslog Adaptor for mappig Syslog facility name to Chukwa
 data type.  (Eric Yang)

Added:
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/build.xml
    incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1023543&r1=1023542&r2=1023543&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Sun Oct 17 18:14:27 2010
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-530. Implemented Syslog Adaptor for mappig Syslog facility name to Chukwa data
type.  (Eric Yang)
+
     CHUKWA-419. Replaced system metrics collection with Sigar. (Eric Yang)
 
     CHUKWA-444. Added HBaseWriter for storing time series data in HBase for faster random
read/write. (Eric Yang)

Modified: incubator/chukwa/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/build.xml?rev=1023543&r1=1023542&r2=1023543&view=diff
==============================================================================
--- incubator/chukwa/trunk/build.xml (original)
+++ incubator/chukwa/trunk/build.xml Sun Oct 17 18:14:27 2010
@@ -494,6 +494,13 @@
 				</replacetokens>
 			</filterchain>
                 </copy>
+                <copy file="${conf.dir}/chukwa-agent-conf.xml.template" tofile="${test.build.dir}/conf/chukwa-agent-conf.xml">
+			<filterchain>
+				<replacetokens>
+					<token key="TODO-CLUSTER-NAME" value="${TODO-CLUSTER-NAME}" />
+				</replacetokens>
+			</filterchain>
+                </copy>
                 <copy file="${basedir}/conf/chukwa-demux-conf.xml.template" tofile="${test.build.dir}/conf/chukwa-demux-conf.xml">
 			<filterchain>
 		

Modified: incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template?rev=1023543&r1=1023542&r2=1023543&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template (original)
+++ incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template Sun Oct 17 18:14:27 2010
@@ -75,4 +75,9 @@
     <value>20000</value>
     <description>the number of milliseconds to wait between searches for a collector</description>
   </property>
+
+  <property>
+    <name>syslog.adaptor.port.9095.facility.LOCAL1</name>
+    <value>HADOOP</value>
+  </property>
 </configuration>

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java?rev=1023543&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
(added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java
Sun Oct 17 18:14:27 2010
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.log4j.Logger;
+
+/**
+ * SyslogAdaptor reads UDP syslog message from a port and convert the message to Chukwa
+ * Chunk for transport from Chukwa Agent to Chukwa Collector.  Usage:
+ * 
+ * add SyslogAdaptor [DataType] [Port] [SequenceNumber]
+ * 
+ * Syslog protocol facility name is mapped to Chukwa Data Type 
+ * by SyslogAdaptor, hence each UDP port can support up to 24 data streams.
+ * 
+ * Data Type mapping can be overwritten in Chukwa Agent Configuration file, i.e.:
+ * 
+ * <property>
+ *   <name>syslog.adaptor.port.9095.facility.LOCAL1</name>
+ *   <value>HADOOP</value>
+ * </property>
+ * 
+ * When demux takes place, data received on port 9095 with facility name LOCAL0 will
+ * be processed by demux parser for data type "HADOOP".
+ */
+public class SyslogAdaptor extends UDPAdaptor {
+
+  private final static Logger log = Logger.getLogger(SyslogAdaptor.class);
+  public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON,
AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6,
LOCAL7 }
+  public HashMap<Integer, String> facilityMap;
+  DatagramSocket ds;
+  volatile boolean running = true;
+  volatile long bytesReceived = 0;
+  
+  public SyslogAdaptor() {
+    facilityMap = new HashMap<Integer, String>(FacilityType.values().length);
+  }
+  
+  public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException
{
+    StringBuilder source = new StringBuilder();
+    source.append(dp.getAddress());
+    String dataType = type;
+    byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
+    String rawPRI = new String(trimmedBuf, 1, 4);
+    int i = rawPRI.indexOf(">");
+    if (i <= 3 && i > -1) {
+      String priorityStr = rawPRI.substring(0,i);
+      int priority = 0;
+      int facility = 0;
+      try {
+        priority = Integer.parseInt(priorityStr);
+        facility = (priority >> 3) << 3;
+        facility = facility / 8;
+        dataType = facilityMap.get(facility); 
+      } catch (NumberFormatException nfe) {
+        log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf);
+      }
+    }
+
+    bytesReceived += trimmedBuf.length;
+    Chunk c = new ChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this);
+    dest.add(c);
+  }
+  
+  @Override
+  public String parseArgs(String s) {
+    portno = Integer.parseInt(s);
+    ChukwaConfiguration cc = new ChukwaConfiguration();
+    for(FacilityType e : FacilityType.values()) {
+      StringBuilder buffer = new StringBuilder();
+      buffer.append("syslog.adaptor.port.");
+      buffer.append(portno);
+      buffer.append(".facility.");
+      buffer.append(e.name());
+      String dataType = cc.get(buffer.toString(), e.name());
+      facilityMap.put(e.ordinal(), dataType);
+    }
+    return s;
+  }
+
+}

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java?rev=1023543&r1=1023542&r2=1023543&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
(original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
Sun Oct 17 18:14:27 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
+import java.io.IOException;
 import java.net.*;
 import java.util.Arrays;
 import org.apache.hadoop.chukwa.*;
@@ -36,16 +37,12 @@ public class UDPAdaptor extends Abstract
   class ListenThread extends Thread {
     public void run() {
       log.info("UDP adaptor " + adaptorID + " started on port " + portno + " offset =" +
bytesReceived);
-      byte[] buf = new byte[1024];
+      byte[] buf = new byte[65535];
       DatagramPacket dp = new DatagramPacket(buf, buf.length);
       try {
         while(running) {
           ds.receive(dp);
-          log.info("got a UDP message");
-          byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
-          bytesReceived += trimmedBuf.length;
-          Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this);
-          dest.add(c);
+          send(buf, dp);
         }
       } catch(Exception e) {
         if(running)
@@ -54,6 +51,13 @@ public class UDPAdaptor extends Abstract
     }
   }
   ListenThread lt;
+
+  public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException
{
+    byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
+    bytesReceived += trimmedBuf.length;
+    Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this);
+    dest.add(c);
+  }
   
   @Override
   public String parseArgs(String s) {



Mime
View raw message