chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r1033672 - in /incubator/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ src/java/org/apache/hadoop/chukwa/inputtools/log4j/
Date Wed, 10 Nov 2010 19:40:48 GMT
Author: asrabkin
Date: Wed Nov 10 19:40:48 2010
New Revision: 1033672

URL: http://svn.apache.org/viewvc?rev=1033672&view=rev
Log:
CHUKWA-546. SocketAdaptor. Contributed by Eric Yang.

Added:
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
    incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1033672&r1=1033671&r2=1033672&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Wed Nov 10 19:40:48 2010
@@ -24,6 +24,8 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    CHUKWA-546. SocketAdaptor. (Eric Yang via asrabkin)
+
     CHUKWA-493. Change release artifact naming. (Eric Yang via asrabkin)
 
     CHUKWA-525. Extensible DirTailingAdaptor. (Jaydeep Ayachit via asrabkin).

Added: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java?rev=1033672&view=auto
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
(added)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java
Wed Nov 10 19:40:48 2010
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.*;
+import java.util.ArrayList;
+
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.util.ExceptionUtil;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggerRepository;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * SocketAdaptor reads TCP message from a port and convert the message to Chukwa
+ * Chunk for transport from Chukwa Agent to Chukwa Collector.  Usage:
+ * 
+ * add SocketAdaptor [DataType] [Port] [SequenceNumber]
+ * 
+ */
+public class SocketAdaptor extends AbstractAdaptor {
+  PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
+
+  private final static Logger log = Logger.getLogger(SocketAdaptor.class);
+  volatile boolean running = true;
+  volatile long bytesReceived = 0;
+  private int port = 9095;
+  
+  class Dispatcher extends Thread {
+    private int port;
+    private ServerSocket listener;
+    
+    public Dispatcher(int port) {
+      this.port = port;
+    }
+    
+    public void run() {
+      try{
+        listener = new ServerSocket(port);
+        Socket server;
+
+        while(running){
+          server = listener.accept();
+          Worker connection = new Worker(server);
+          Thread t = new Thread(connection);
+          t.start();
+        }
+      } catch (IOException ioe) {
+        log.error("SocketAdaptor Dispatcher problem:", ioe);
+      }
+    }
+    
+    public void shutdown() {
+      try {
+        listener.close();
+      } catch (IOException e) {
+      }
+    }
+  }
+  
+  class Worker implements Runnable {
+    private ObjectInputStream ois;
+    private Socket server;
+    
+    public Worker(Socket server) {
+      this.server = server;
+    }
+    
+    public void run() {
+      LoggingEvent event;
+
+      try {
+        ois = new ObjectInputStream(
+                           new BufferedInputStream(server.getInputStream()));
+        if (ois != null) {
+          while(running) {
+            // read an event from the wire
+            event = (LoggingEvent) ois.readObject();
+            byte[] bytes = layout.format(event).getBytes();
+            bytesReceived=bytes.length;
+            Chunk c = new ChunkImpl(type, java.net.InetAddress.getLocalHost().getHostName(),
bytesReceived, bytes, SocketAdaptor.this);
+            dest.add(c);
+          }
+        }
+      } catch(java.io.EOFException e) {
+        log.info("Caught java.io.EOFException closing conneciton.");
+      } catch(java.net.SocketException e) {
+        log.info("Caught java.net.SocketException closing conneciton.");
+      } catch(InterruptedIOException e) {
+        Thread.currentThread().interrupt();
+        log.info("Caught java.io.InterruptedIOException: "+e);
+        log.info("Closing connection.");
+      } catch(IOException e) {
+        log.info("Caught java.io.IOException: "+e);
+        log.info("Closing connection.");
+      } catch(Exception e) {
+        log.error("Unexpected exception. Closing conneciton.", e);
+      } finally {
+        if (ois != null) {
+           try {
+              ois.close();
+           } catch(Exception e) {
+              log.info("Could not close connection.", e);
+           }
+        }
+        if (server != null) {
+          try {
+            server.close();
+          } catch(InterruptedIOException e) {
+              Thread.currentThread().interrupt();
+          } catch(IOException ex) {
+          }
+        }
+      }
+    }
+    
+    public void shutdown() {
+      try {
+        ois.close();
+        server.close();
+      } catch (IOException e) {
+      }
+    }
+  }
+  
+  Dispatcher disp;
+  
+  @Override
+  public String parseArgs(String s) {
+    port = Integer.parseInt(s);
+    return s;
+  }
+
+  @Override
+  public void start(long offset) throws AdaptorException {
+    try {
+      disp = new Dispatcher(port);
+      disp.setDaemon(true);
+      disp.start();      
+    } catch (Exception e) {
+      throw new AdaptorException(ExceptionUtil.getStackTrace(e));
+    }
+  }
+
+  @Override
+  public String getCurrentStatus() {
+    return type + " " + port;
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    try {
+      running = false;
+      disp.shutdown();
+    } catch(Exception e) {}
+    return 0;
+  }
+
+}

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=1033672&r1=1033671&r2=1033672&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
(original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
Wed Nov 10 19:40:48 2010
@@ -51,7 +51,6 @@ import org.json.JSONObject;
 @Table(name="Hadoop",columnFamily="Hadoop_mapred_job"),
 @Table(name="Hadoop",columnFamily="Hadoop_rpc_metrics")
 })
-@Deprecated
 public class HadoopMetricsProcessor extends AbstractProcessor {
   public static final String jvm = "Hadoop_jvm_metrics";
   public static final String mapred = "Hadoop_mapred_metrics";
@@ -84,9 +83,11 @@ public class HadoopMetricsProcessor exte
       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
       throws Throwable {
     try {
-      String dStr = recordEntry.substring(0, 23);
-      int start = 24;
-      int idx = recordEntry.indexOf(' ', start);
+      // Look for syslog PRI, if PRI is not found, start from offset of 0.
+      int idx = recordEntry.indexOf('>', 0);  
+      String dStr = recordEntry.substring(idx+1, idx+23);
+      int start = idx + 25;
+      idx = recordEntry.indexOf(' ', start);
       // String level = recordEntry.substring(start, idx);
       start = idx + 1;
       idx = recordEntry.indexOf(' ', start);

Modified: incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=1033672&r1=1033671&r2=1033672&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
(original)
+++ incubator/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
Wed Nov 10 19:40:48 2010
@@ -17,7 +17,14 @@
  */
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
-
+/**
+ * Log4JMetricsContext is a plugin for reporting Hadoop Metrics through
+ * syslog protocol.  Usage:
+ * 
+ * Copy hadoop-metrics.properties file from CHUKWA_HOME/conf to HADOOP_HOME/conf.
+ * Copy chukwa-hadoop-*-client.jar and json.jar to HADOOP_HOME/lib
+ * 
+ */
 import java.io.File;
 import java.io.IOException;
 
@@ -36,18 +43,15 @@ public class Log4JMetricsContext extends
   static final Object lock = new Object();
 
   /* Configuration attribute names */
-  protected static final String  OUTPUT_DIR_PROPERTY = "directory";
   protected static final String PERIOD_PROPERTY = "period";
-  protected static final String ADD_UUID_PROPERTY = "uuid";
-  
+  protected static final String HOST_PROPERTY = "host";
+  protected static final String PORT_PROPERTY = "port";  
 
-  protected static final String user = System.getProperty("user.name");
-  
-  protected String outputDir = null;
   protected int period = 0;
-  protected boolean needUUID = false;
+  protected String host = "localhost";
+  protected int port = 9095;
   
-  /** Creates a new instance of FileContext */
+  /** Creates a new instance of MetricsContext */
   public Log4JMetricsContext() {
   }
 
@@ -68,22 +72,13 @@ public class Log4JMetricsContext extends
       this.period = period;
       log.info("Log4JMetricsContext." + contextName + ".period=" + period);
     }
-    
-    outputDir = getAttribute(OUTPUT_DIR_PROPERTY);
-    if (outputDir == null) {
-      log.warn("Log4JMetricsContext." + contextName + "."+ OUTPUT_DIR_PROPERTY + " is null");
-      throw new MetricsException("Invalid output directory: " + outputDir);
-    }
-    File fOutputDir = new File(outputDir);
-    if (!fOutputDir.exists()) {
-      fOutputDir.mkdirs();
-    }
-    log.info("Log4JMetricsContext." + contextName + "." + OUTPUT_DIR_PROPERTY +"=" + outputDir);
-    
-    String uuid = getAttribute(ADD_UUID_PROPERTY);
-    if (uuid != null && uuid.equalsIgnoreCase("true")) {
-      needUUID = true;
-      log.info("Log4JMetricsContext." + contextName + "." + ADD_UUID_PROPERTY +" has been
activated."); 
+    String host = getAttribute(HOST_PROPERTY);
+    if (host != null) {
+      this.host = host;
+    }
+    String port = getAttribute(PORT_PROPERTY);
+    if (port != null) {
+      this.port = Integer.parseInt(port);
     }
   }
 
@@ -95,22 +90,10 @@ public class Log4JMetricsContext extends
         if (out == null) {
           PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
           
-          org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender
= new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
+          org.apache.log4j.net.SocketAppender appender = new org.apache.log4j.net.SocketAppender(host,
port);
+          
           appender.setName("chukwa.metrics." + contextName);
           appender.setLayout(layout);
-          appender.setAppend(true);
-          if (needUUID) {
-            appender.setFile(outputDir + File.separator + "chukwa-" + user
-                + "-" + contextName + "-" + System.currentTimeMillis()
-                + ".log");
-          } else {
-            appender.setFile(outputDir + File.separator + "chukwa-" + user
-                + "-" + contextName 
-                + ".log");
-          }
-
-          appender.setRecordType( contextName);
-          appender.setDatePattern(".yyyy-MM-dd");
           
           Logger logger = Logger.getLogger("chukwa.metrics." + contextName);
           logger.setAdditivity(false);



Mime
View raw message