Return-Path: Delivered-To: apmail-incubator-chukwa-commits-archive@www.apache.org Received: (qmail 93693 invoked from network); 17 Oct 2010 18:15:25 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 17 Oct 2010 18:15:25 -0000 Received: (qmail 81518 invoked by uid 500); 17 Oct 2010 18:15:25 -0000 Delivered-To: apmail-incubator-chukwa-commits-archive@incubator.apache.org Received: (qmail 81496 invoked by uid 500); 17 Oct 2010 18:15:25 -0000 Mailing-List: contact chukwa-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@incubator.apache.org Delivered-To: mailing list chukwa-commits@incubator.apache.org Received: (qmail 81488 invoked by uid 99); 17 Oct 2010 18:15:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Oct 2010 18:15:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 17 Oct 2010 18:15:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2FAD823888E7; Sun, 17 Oct 2010 18:14:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1023543 - in /incubator/chukwa/trunk: ./ conf/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ Date: Sun, 17 Oct 2010 18:14:28 -0000 To: chukwa-commits@incubator.apache.org From: eyang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101017181428.2FAD823888E7@eris.apache.org> Author: eyang Date: Sun Oct 17 18:14:27 2010 New Revision: 1023543 URL: http://svn.apache.org/viewvc?rev=1023543&view=rev Log: CHUKWA-530. Implemented Syslog Adaptor for mappig Syslog facility name to Chukwa data type. (Eric Yang) Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java Modified: incubator/chukwa/trunk/CHANGES.txt incubator/chukwa/trunk/build.xml incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java Modified: incubator/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1023543&r1=1023542&r2=1023543&view=diff ============================================================================== --- incubator/chukwa/trunk/CHANGES.txt (original) +++ incubator/chukwa/trunk/CHANGES.txt Sun Oct 17 18:14:27 2010 @@ -4,6 +4,8 @@ Trunk (unreleased changes) NEW FEATURES + CHUKWA-530. Implemented Syslog Adaptor for mappig Syslog facility name to Chukwa data type. (Eric Yang) + CHUKWA-419. Replaced system metrics collection with Sigar. (Eric Yang) CHUKWA-444. Added HBaseWriter for storing time series data in HBase for faster random read/write. (Eric Yang) Modified: incubator/chukwa/trunk/build.xml URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/build.xml?rev=1023543&r1=1023542&r2=1023543&view=diff ============================================================================== --- incubator/chukwa/trunk/build.xml (original) +++ incubator/chukwa/trunk/build.xml Sun Oct 17 18:14:27 2010 @@ -494,6 +494,13 @@ + + + + + + + Modified: incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template?rev=1023543&r1=1023542&r2=1023543&view=diff ============================================================================== --- incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template (original) +++ incubator/chukwa/trunk/conf/chukwa-agent-conf.xml.template Sun Oct 17 18:14:27 2010 @@ -75,4 +75,9 @@ 20000 the number of milliseconds to wait between searches for a collector + + + syslog.adaptor.port.9095.facility.LOCAL1 + HADOOP + Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java?rev=1023543&view=auto ============================================================================== --- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java (added) +++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java Sun Oct 17 18:14:27 2010 @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.adaptor; + +import java.io.IOException; +import java.net.*; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.hadoop.chukwa.*; +import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; +import org.apache.log4j.Logger; + +/** + * SyslogAdaptor reads UDP syslog message from a port and convert the message to Chukwa + * Chunk for transport from Chukwa Agent to Chukwa Collector. Usage: + * + * add SyslogAdaptor [DataType] [Port] [SequenceNumber] + * + * Syslog protocol facility name is mapped to Chukwa Data Type + * by SyslogAdaptor, hence each UDP port can support up to 24 data streams. + * + * Data Type mapping can be overwritten in Chukwa Agent Configuration file, i.e.: + * + * + * syslog.adaptor.port.9095.facility.LOCAL1 + * HADOOP + * + * + * When demux takes place, data received on port 9095 with facility name LOCAL0 will + * be processed by demux parser for data type "HADOOP". + */ +public class SyslogAdaptor extends UDPAdaptor { + + private final static Logger log = Logger.getLogger(SyslogAdaptor.class); + public enum FacilityType { KERN, USER, MAIL, DAEMON, AUTH, SYSLOG, LPR, NEWS, UUCP, CRON, AUTHPRIV, FTP, NTP, AUDIT, ALERT, CLOCK, LOCAL0, LOCAL1, LOCAL2, LOCAL3, LOCAL4, LOCAL5, LOCAL6, LOCAL7 } + public HashMap facilityMap; + DatagramSocket ds; + volatile boolean running = true; + volatile long bytesReceived = 0; + + public SyslogAdaptor() { + facilityMap = new HashMap(FacilityType.values().length); + } + + public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException { + StringBuilder source = new StringBuilder(); + source.append(dp.getAddress()); + String dataType = type; + byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength()); + String rawPRI = new String(trimmedBuf, 1, 4); + int i = rawPRI.indexOf(">"); + if (i <= 3 && i > -1) { + String priorityStr = rawPRI.substring(0,i); + int priority = 0; + int facility = 0; + try { + priority = Integer.parseInt(priorityStr); + facility = (priority >> 3) << 3; + facility = facility / 8; + dataType = facilityMap.get(facility); + } catch (NumberFormatException nfe) { + log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf); + } + } + + bytesReceived += trimmedBuf.length; + Chunk c = new ChunkImpl(dataType, source.toString(), bytesReceived, trimmedBuf, SyslogAdaptor.this); + dest.add(c); + } + + @Override + public String parseArgs(String s) { + portno = Integer.parseInt(s); + ChukwaConfiguration cc = new ChukwaConfiguration(); + for(FacilityType e : FacilityType.values()) { + StringBuilder buffer = new StringBuilder(); + buffer.append("syslog.adaptor.port."); + buffer.append(portno); + buffer.append(".facility."); + buffer.append(e.name()); + String dataType = cc.get(buffer.toString(), e.name()); + facilityMap.put(e.ordinal(), dataType); + } + return s; + } + +} Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java?rev=1023543&r1=1023542&r2=1023543&view=diff ============================================================================== --- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java (original) +++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java Sun Oct 17 18:14:27 2010 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor; +import java.io.IOException; import java.net.*; import java.util.Arrays; import org.apache.hadoop.chukwa.*; @@ -36,16 +37,12 @@ public class UDPAdaptor extends Abstract class ListenThread extends Thread { public void run() { log.info("UDP adaptor " + adaptorID + " started on port " + portno + " offset =" + bytesReceived); - byte[] buf = new byte[1024]; + byte[] buf = new byte[65535]; DatagramPacket dp = new DatagramPacket(buf, buf.length); try { while(running) { ds.receive(dp); - log.info("got a UDP message"); - byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength()); - bytesReceived += trimmedBuf.length; - Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this); - dest.add(c); + send(buf, dp); } } catch(Exception e) { if(running) @@ -54,6 +51,13 @@ public class UDPAdaptor extends Abstract } } ListenThread lt; + + public void send(byte[] buf, DatagramPacket dp) throws InterruptedException, IOException { + byte[] trimmedBuf = Arrays.copyOf(buf, dp.getLength()); + bytesReceived += trimmedBuf.length; + Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this); + dest.add(c); + } @Override public String parseArgs(String s) {