chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r797659 - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/java/org/apache/hadoop/...
Date Fri, 24 Jul 2009 21:18:41 GMT
Author: asrabkin
Date: Fri Jul 24 21:18:40 2009
New Revision: 797659

URL: http://svn.apache.org/viewvc?rev=797659&view=rev
Log:
CHUKWA-358. Pluggable real-time monitoring at collector

Added:
    hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
    hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/index.xml
    hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/programming.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestDumpChunks.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Fri Jul 24 21:18:40 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-358. Real-time monitoring at collector. (asrabkin)
+
     CHUKWA-352. Xtrace in contrib. (asrabkin)
 
     CHUKWA-346. Simplified sink archiver. (asrabkin)

Modified: hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/contrib/chukwa-pig/chukwa-pig.jar?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
Binary files - no diff available.

Added: hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml?rev=797659&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml (added)
+++ hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml Fri Jul 24
21:18:40 2009
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+  <header>
+    <title>Chukwa Collector Setup Guide</title>
+  </header>
+  <body>
+  	<section>
+  	  <title>Basic Operation</title>
+  		<p>Chukwa Collectors are responsible for accepting incoming data from Agents,
+  		and storing the data.  Most commonly, collectors simply write to HDFS.  
+  		In this mode, the filesystem to write to is determined by the option
+  		<code>writer.hdfs.filesystem</code> in  <code>chukwa-collector-conf.xml</code>.
+  		 This is the only option that you really need to specify to get a working 
+  		 collector.
+  		</p>
+  	</section>
+  	
+  	<section><title>Configuration Knobs</title>
+  	<p>There's a bunch more "standard" knobs worth knowing about. These
+  	are mostly documented in <code>chukwa-collector-conf.xml</code></p>
+  	</section>
+  	
+  	<section><title>Advanced options</title>
+  	<p>
+  	  There are some advanced options, not necessarily documented in the
+  	collector conf file, that are helpful in using Chukwa in nonstandard ways.
+  	</p> <p>
+	    While normally Chukwa writes sequence files to HDFS, it's possible to
+	    specify an alternate Writer class. The option 
+	    <code>chukwaCollector.writerClass</code> specifies a Java class to instantiate
+	    and use as a writer. See the <code>ChukwaWriter</code> javadoc for details.
+	  </p>  <p>
+	  	One particularly useful Writer class is <code>PipelineWriter</code>, which
+	  	lets you string together a series of <code>PipelineableWriters</code>
+	  	for pre-processing or post-processing incoming data.
+	  	As an example, the SocketTeeWriter class allows other programs to get incoming chunks
+	  	fed to them over a socket by the collector.
+	  	</p>
+	  	
+	  	<p>Stages in the pipeline should be listed, comma-separated, in option 
+	  	<code>chukwaCollector.pipeline</code></p>
+	  	
+	  	<source>
+&#60;property&#62;
+  &#60;name&#62;chukwaCollector.writerClass&#60;/name&#62;
+  &#60;value&#62;org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter&#60;/value&#62;
+&#60;/property&#62;
+
+&#60;property&#62;
+  &#60;name&#62;chukwaCollector.pipeline&#60;/name&#62;
+  &#60;value&#62;org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter&#60;/value&#62;
+&#60;/property&#62;
+	  	</source>
+	  	
+	  	<section>
+	  	<title>SocketTeeWriter</title>
+	  	<p>
+	  		The <code>SocketTeeWriter</code> allows external processes to watch
+	  	the stream of chunks passing through the collector. This allows certain kinds
+	  	of real-time monitoring to be done on-top of Chukwa.</p>  
+	  	
+	  	 <p>  
+	  	    SocketTeeWriter listens on a port (specified by conf option
+	  	 <code>chukwaCollector.tee.port</code>, defaulting to 9094.)  Applications
+	  	 that want Chunks should connect to that port, and issue a command of the form
+	  	 <code>RAW|WRITABLE &#60;filter&#62;\n</code>. Filters use the same
syntax
+	  	 as the <a href="programming.html#Reading+data+from+the+sink+or+the+archive">
+	  	 Dump command</a>.  If the filter is accepted, the Writer will respond 
+	  	 <code>OK\n</code>.
+	  	 </p>
+	  	 <p>
+	  	 Subsequently, Chunks matching the filter will be serialized and sent back over the socket.
+	  	Specifying "WRITABLE" will cause the chunks to be written using Hadoop's 
+	  	Writable serialization framework. "RAW" will send the internal data of the
+	  	Chunk, without any metadata, prefixed by its length encoded as a 32-bit int,
+	  	big-endian.  
+	  	</p>
+	  	<p>
+	  	The filter will be inactivated when the socket is closed.
+	  	</p>
+
+	  	<source>
+Socket s2 = new Socket("host", SocketTeeWriter.DEFAULT_PORT);
+s2.getOutputStream().write("RAW datatype=XTrace\n".getBytes());
+dis = new DataInputStream(s2.getInputStream());
+dis.readFully(new byte[3]); //read "OK\n"
+while(true) {
+   int len = dis.readInt();
+   byte[] data = new byte[len];
+   dis.readFully(data);
+   DoSomethingUsing(data);
+}
+	  	</source>
+	  	</section>
+	  	
+
+  	</section>
+  </body>
+</document>
\ No newline at end of file

Modified: hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/index.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/index.xml?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/index.xml (original)
+++ hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/index.xml Fri Jul 24 21:18:40
2009
@@ -23,6 +23,18 @@
   </header>
   
   <body>
+  	<p>
+  	  Log processing was one of the original purposes of MapReduce. Unfortunately,
+  	  using Hadoop for MapReduce processing of logs is somewhat troublesome. 
+  	  Logs are generated incrementally across many machines, but Hadoop MapReduce
+  	  works best on a small number of large files. And HDFS doesn't currently
+  	  support appends, making it difficult to keep the distributed copy fresh.
+  	  </p>
+  	  <p>
+  	  Chukwa is a Hadoop subproject devoted to bridging that gap between logs
+  	   and MapReduce.  Chukwa is a scalable distributed monitoring and analysis
+  	   system, particularly logs from Hadoop and other large systems.  
+  	</p>
       <p>
         The Chukwa Documentation provides the information you need to get 
         started using Chukwa.
@@ -33,7 +45,9 @@
         shows you how to setup and deploy Chukwa. 
       </p>
      <p> If you want to configure the Chukwa agent process, to control what's 
-     collected, you should read the <a href="agent.html">Agent Guide</a>.
+     collected, you should read the <a href="agent.html">Agent Guide</a>. There's
+     also a  <a href="collector.html">Collector Guide</a> describing that part
of 
+     the pipeline.
      </p>
      <p>And if you want to use collected data, read the 
      <a href="programming.html">User and Programming Guide</a></p>

Modified: hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/programming.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/programming.xml?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/programming.xml (original)
+++ hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/programming.xml Fri Jul 24
21:18:40 2009
@@ -39,7 +39,7 @@
 <title>Sink File Format</title>
 <p>
 As data is collected, Chukwa dumps it into <em>sink files</em> in HDFS. By
-default, these are located in <code>/chukwa/logs</code>.  If the file name ends
+default, these are located in <code>hdfs:///chukwa/logs</code>.  If the file
name ends
 in .chukwa, that means the file is still being written to. Every few minutes, 
 the collector will close the file, and rename the file to '*.done'.  This 
 marks the file as available for processing.</p>
@@ -79,7 +79,7 @@
 <p>The simple archiver moves every <code>.done</code> file out of the sink,
and 
 then runs a MapReduce job to group the data. Output Chunks will be placed into 
 files with names of the form 
-<code>/chukwa/archive/clustername/Datatype_date.arc</code>.  
+<code>hdfs:///chukwa/archive/clustername/Datatype_date.arc</code>.  
 Date corresponds to when the data was collected; Datatype is the datatype of 
 each Chunk. 
 </p>
@@ -101,8 +101,8 @@
 </p>
 
 <p>
-The <code>dump</code> tool a search pattern as its first argument, followed by
-a list of files or file-globs.  It will then print the contents of every data
+The <code>dump</code> tool takes a search pattern as its first argument, followed
+by a list of files or file-globs.  It will then print the contents of every data
 stream in those files that matches the pattern. (A data stream is a sequence of
 chunks with the same host, source, and datatype.)  Data is printed in order,
 with duplicates removed.  No metadata is printed.  Separate streams are 
@@ -113,7 +113,7 @@
 matches the glob pattern.  Note the use of single quotes to pass glob patterns
 through to the application, preventing the shell from expanding them.</p>
 <source>
-bin/dump.sh 'datatype=.*' 'hdfs://host:9000/chukwa/archive/*.arc
+$CHUKWA_HOME/bin/dump.sh 'datatype=.*' 'hdfs://host:9000/chukwa/archive/*.arc'
 </source>
 
 <p>
@@ -123,8 +123,8 @@
 is of the form <code>metadatafield=regex</code>, where 
 <code>metadatafield</code> is one of the Chukwa metadata fields, and 
 <code>regex</code> is a regular expression.  The valid metadata field names are:
-<code>datatype</code>, <code>host</code>, <code>cluster</code>,
and 
-<code>name</code>.  
+<code>datatype</code>, <code>host</code>, <code>cluster</code>,

+<code>content</code> and <code>name</code>.  
 </p>
 
 <p>A stream matches the search pattern only if every rule matches. So to 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
Fri Jul 24 21:18:40 2009
@@ -66,7 +66,7 @@
         } else
           return null;
       } catch (Exception e2) {
-        log.warn("Error instantiating new adaptor"+ className +  " by classname"
+        log.warn("Error instantiating new adaptor "+ className +  " by classname"
             + " and also with \"o.a.h.c.datacollection.adaptor\" prefix added", e2);
         return null;
       }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
Fri Jul 24 21:18:40 2009
@@ -25,6 +25,12 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
+/**
+ * A pipeline of Pipelineable writers
+ * Controlled by option 'chukwaCollector.pipeline', which should be a comma-
+ * separated list of classnames. 
+ * 
+ */
 public class PipelineStageWriter implements ChukwaWriter {
   Logger log = Logger.getLogger(PipelineStageWriter.class);
 
@@ -79,6 +85,7 @@
               + " at end of processing pipeline isn't a ChukwaWriter");
           throw new WriterException("bad pipeline");
         } else {
+          ((ChukwaWriter)st).init(conf);
           if (lastWriter != null)
             lastWriter.setNextStage((ChukwaWriter) st);
           else

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=797659&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
Fri Jul 24 21:18:40 2009
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.regex.PatternSyntaxException;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.util.DumpChunks;
+import static org.apache.hadoop.chukwa.util.DumpChunks.Filter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.io.*;
+
+/**
+ * Effectively a "Tee" in the writer pipeline.
+ * Accepts incoming connections on port specified by chukwaCollector.tee.port.
+ * Defaults to 9094
+ * 
+ * Protocol is as follows:
+ * Client ---> TeeWriter   "RAW | WRITABLE <filter>" 
+ *                  as per DumpChunks.
+ *                  
+ * TeeWriter ---> Client "OK\n"                 
+ *   In RAW mode               
+ * TeeWriter ---> Client (length(int)  byte[length])*
+ *              An indefinite sequence of length, followed by byte array.
+ *              
+ *  In Writable mode
+ * TeeWriter ---> Client    (Chunk serialized as Writable)*
+ *              An indefinite sequence of serialized chunks
+ *              
+ *  In english: clients should connect and say either "RAW " or "WRITABLE " 
+ *  followed by a filter.  (Note that the keyword is followed by exactly one space.)
+ *  They'll then receive either a sequence of byte arrays or of writable-serialized
+ *  
+ */
+public class SocketTeeWriter implements PipelineableWriter {
+
+  public static final String WRITABLE = "WRITABLE";
+  public static final String RAW = "RAW";
+  static final int DEFAULT_PORT = 9094;
+  static Logger log = Logger.getLogger(SocketTeeWriter.class);
+  volatile boolean running = true;
+  int timeout;
+  
+  /**
+   * Listens for incoming connections, spawns a Tee to deal with each.
+   */
+  class SocketListenThread extends Thread {
+    ServerSocket s;
+    public SocketListenThread(Configuration conf) throws IOException {
+      int portno = conf.getInt("chukwaCollector.tee.port", DEFAULT_PORT);
+      s = new ServerSocket(portno);
+    }
+    
+    public void run() {
+      log.info("listen thread started");
+      try{
+        while(running) {
+          Socket sock = s.accept();
+          log.info("got connection from " + sock.getInetAddress());
+          new Tee(sock);
+        }
+      } catch(IOException e) {
+        
+      }
+    }
+    
+    public void shutdown() {
+      try{
+        s.close();
+      } catch(IOException e) {
+        
+      }
+    }
+  }
+  
+  /////////////////Internal class Tee//////////////////////
+  /**
+   * Manages a single socket connection
+   */
+  class Tee implements Runnable {
+    Socket sock;
+    BufferedReader in;
+    DataOutputStream out;
+    DumpChunks.Filter rules;
+    boolean sendRawBytes;
+    public Tee(Socket s) throws IOException {
+      sock = s;
+      //now initialize asynchronously
+      run();
+//      new Thread(this).start();
+    }
+    /**
+     * initializes the tee.
+     */
+    public void run() {
+      try {   //outer try catches IOExceptions
+       try { //inner try catches Pattern Syntax errors
+        sock.setSoTimeout(timeout);
+        in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+        String cmd = in.readLine();
+        if(!cmd.contains(" ")) {
+          
+          throw new PatternSyntaxException(
+              "command should be keyword pattern, but no ' ' seen", cmd, -1);
+        }
+        String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
+        if(RAW.equals(uppercased))
+          sendRawBytes = true;
+        else if(!WRITABLE.equals(uppercased)) {
+          throw new PatternSyntaxException("bad command '" + uppercased+
+              "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE+"'.", cmd, -1);
+        }
+        
+        String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
+        rules = new DumpChunks.Filter(cmdAfterSpace);
+        out = new DataOutputStream(sock.getOutputStream());
+
+          //now that we read everything OK we can add ourselves to list, and return.
+        synchronized(tees) {
+          tees.add(this);
+        }
+        out.write("OK\n".getBytes());
+        log.info("tee to " + sock.getInetAddress() + " established");
+      } catch(PatternSyntaxException e) {
+          out.write(e.toString().getBytes());
+          out.writeByte('\n');
+          out.close();
+          in.close();
+          sock.close();
+          log.warn(e);
+        }//end inner catch
+      } catch(IOException e) { //end outer catch
+         log.warn(e);
+      }
+    }
+    
+    public void maybeSend(Chunk c) throws IOException {
+      if(rules.matches(c)) {
+        if(sendRawBytes) {
+          byte[] data = c.getData();
+          out.writeInt(data.length);
+          out.write(data);
+        } else
+          c.write(out);
+      }
+    }
+    
+    public void close() {
+      try {
+        out.close();
+        in.close();
+      } catch(Exception e) {}
+    }
+  }
+  
+  
+  /////////////////Main class SocketTeeWriter//////////////////////
+  
+  
+  SocketListenThread listenThread;
+  List<Tee> tees;
+  ChukwaWriter next;
+  
+  @Override
+  public void setNextStage(ChukwaWriter next) {
+    this.next = next;
+  }
+
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    next.add(chunks); //pass data through
+    synchronized(tees) {
+      Iterator<Tee> loop = tees.iterator();
+      while(loop.hasNext()) {
+        Tee t = loop.next();
+        try {
+          for(Chunk c: chunks) {
+            t.maybeSend(c);
+          }
+        } catch(IOException e) {
+          t.close();
+          loop.remove(); //drop failed tee from list.
+          log.info("lost connection: "+ e.toString());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws WriterException {
+    next.close();
+    running = false;
+    listenThread.shutdown();
+  }
+
+  @Override
+  public void init(Configuration c) throws WriterException {
+    try {
+      listenThread = new SocketListenThread(c);
+      listenThread.start();
+    } catch (IOException e) {
+      throw new WriterException(e);
+    }
+    tees = new ArrayList<Tee>();
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/DumpChunks.java Fri Jul 24
21:18:40 2009
@@ -25,8 +25,7 @@
 import java.util.*;
 import java.io.*;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.chukwa.ChukwaArchiveKey;
-import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.*;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,7 +36,7 @@
 
 public class DumpChunks {
 
-  static class SearchRule {
+  private static class SearchRule {
     Pattern p;
     String targ;
     
@@ -46,7 +45,7 @@
       this.targ = t;
     }
     
-    boolean matches(ChunkImpl chunk) {
+    boolean matches(Chunk chunk) {
       if(targ.equals("datatype")) {
         return p.matcher(chunk.getDataType()).matches();
       } else if(targ.equals("name")) {
@@ -56,6 +55,10 @@
       } else if(targ.equals("cluster")) {
         String cluster = RecordUtil.getClusterName(chunk);
         return p.matcher(cluster).matches();
+      } else if(targ.equals("content")) {
+        String content = new String(chunk.getData());
+        Matcher m = p.matcher(content);
+        return m.matches();
       }
       else { 
         assert false: "unknown target: " +targ;
@@ -63,9 +66,62 @@
       }
     }
     
+    public String toString() {
+      return targ + "=" +p.toString();
+    }
+    
   }
   
-  static final String[] SEARCH_TARGS = {"datatype", "name", "host", "cluster"};
+  public static class Filter {
+    List<SearchRule> compiledPatterns;
+    
+    public Filter(String listOfPatterns) throws  PatternSyntaxException{
+      compiledPatterns = new ArrayList<SearchRule>();
+      //FIXME: could escape these
+      String[] patterns = listOfPatterns.split(SEPARATOR);
+      for(String p: patterns) {
+        int equalsPos = p.indexOf('=');
+        
+        if(equalsPos < 0 || equalsPos > (p.length() -2)) {
+          throw new PatternSyntaxException(
+              "pattern must be of form targ=pattern", p, -1);
+        }
+        
+        String targ = p.substring(0, equalsPos);
+        if(!ArrayUtils.contains(SEARCH_TARGS, targ)) {
+          throw new PatternSyntaxException(
+              "pattern doesn't start with recognized search target", p, -1);
+        }
+        
+        Pattern pat = Pattern.compile(p.substring(equalsPos+1), Pattern.DOTALL);
+        compiledPatterns.add(new SearchRule(pat, targ));
+      }
+    }
+
+    public boolean matches(Chunk chunk) {
+      for(SearchRule r: compiledPatterns) {
+        if(!r.matches(chunk))
+          return false;
+      }
+      return true;
+    }
+    
+    public int size() {
+      return compiledPatterns.size();
+    }
+    
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(compiledPatterns.get(0));
+      for(int i=1; i < compiledPatterns.size(); ++i) {
+        sb.append(" & ");
+        sb.append(compiledPatterns.get(i));
+      }
+      return sb.toString();
+    }
+  }//end class
+  
+  static final String[] SEARCH_TARGS = {"datatype", "name", "host", "cluster", "content"};
 
     static final String SEPARATOR="&";
   /**
@@ -81,14 +137,20 @@
       System.out.println("usage: Dump pattern1,pattern2,pattern3... file1 file2 file3...");
       System.exit(-1);
     }
-    System.err.println("Patterns:" + args[0]);
+    
     for(int i=1; i < args.length; ++i)
         System.err.println("FileGlob: " + args[i]);
 
     ChukwaConfiguration conf = new ChukwaConfiguration();
+
+
+    dump(args, conf, System.out);
+  }
+  
+  static FileSystem getFS(Configuration conf, String uri) throws IOException, URISyntaxException
{
     FileSystem fs;
-    if(args[1].contains("://")) {
-      fs = FileSystem.get(new URI(args[1]), conf);
+    if(uri.contains("://")) {
+      fs = FileSystem.get(new URI(uri), conf);
     } else {
       String fsName = conf.get("writer.hdfs.filesystem");
       if(fsName == null)
@@ -97,25 +159,40 @@
         fs = FileSystem.get(conf);
     }
     System.err.println("filesystem is " + fs.getUri());
-
-    dump(args, conf, fs, System.out);
+    return fs;
   }
 
-  static void dump(String[] args, Configuration conf,
-      FileSystem fs, PrintStream out) throws IOException {
-    List<SearchRule> patterns = buildPatterns(args[0]);
+  static void dump(String[] args, Configuration conf, PrintStream out) throws IOException,
URISyntaxException {
+    
+    int filterArg = 0;
+    boolean summarize = false;
+    if(args[0].equals("-s")) {
+      filterArg++;
+      summarize = true;
+    }
+    
+    Filter patterns = new Filter(args[filterArg]);
+
+    System.err.println("Patterns:" + patterns);
     ArrayList<Path> filesToSearch = new ArrayList<Path>();
 
-    Map<String, SortedMap<Long, ChunkImpl> > matchCatalog = new HashMap<String,
SortedMap<Long, ChunkImpl> >();
-    
-    for(int i=1; i < args.length; ++i){
+
+    FileSystem fs = getFS(conf, args[filterArg + 1]);
+    for(int i=filterArg + 1; i < args.length; ++i){
       Path[] globbedPaths = FileUtil.stat2Paths(fs.globStatus(new Path(args[i])));
-      for(Path p: globbedPaths)
-        filesToSearch.add(p);
+      if(globbedPaths != null)
+        for(Path p: globbedPaths)
+          filesToSearch.add(p);
     }
     
     System.err.println("expands to " + filesToSearch.size() + " actual files");
 
+    DumpChunks dc;
+    if(summarize)
+      dc = new DumpAndSummarize();
+    else 
+      dc= new DumpChunks();
+    
     try {
       for(Path p: filesToSearch) {
       
@@ -124,50 +201,55 @@
         ChukwaArchiveKey key = new ChukwaArchiveKey();
         ChunkImpl chunk = ChunkImpl.getBlankChunk();
         while (r.next(key, chunk)) {
-          if(matchesPattern(patterns, chunk)) {
-            updateMatchCatalog(matchCatalog, key.getStreamName(), chunk);
+          if(patterns.matches(chunk)) {
+            dc.updateMatchCatalog(key.getStreamName(), chunk);
             chunk = ChunkImpl.getBlankChunk();
           }
         }
       }
       
-      for(SortedMap<Long, ChunkImpl> stream: matchCatalog.values()) {
-        printNoDups(stream, out);
-        out.println("\n--------------------");
-      }
+      dc.displayResults(out);
       
     } catch (Exception e) {
       e.printStackTrace();
     }
   }
-  
-  private static void printNoDups(SortedMap<Long, ChunkImpl> stream, OutputStream out)
throws IOException {
-    long nextToPrint = 0;
 
-   System.err.println("---- map starts at "+ stream.firstKey());
-    for(Map.Entry<Long, ChunkImpl> e: stream.entrySet()) {
-      if(e.getKey() >= nextToPrint) {
-        System.err.println("---- printing bytes starting at " + e.getKey());
-        out.write(e.getValue().getData());
-        nextToPrint = e.getValue().getSeqID();
-      } else if(e.getValue().getSeqID() < nextToPrint) {
-        continue; //data already printed
-      } else {
-        //tricky case: chunk overlaps with already-printed data, but not completely
-        ChunkImpl c = e.getValue();
-        long chunkStartPos = e.getKey();
-        int numToPrint = (int) (c.getSeqID() - nextToPrint);
-        int printStartOffset = (int) ( nextToPrint -  chunkStartPos);
-        out.write(c.getData(), printStartOffset, numToPrint);
-        nextToPrint = c.getSeqID();
+  public DumpChunks() {
+    matchCatalog = new HashMap<String, SortedMap<Long, ChunkImpl> >();
+  }
+
+  Map<String, SortedMap<Long, ChunkImpl>> matchCatalog;
+  
+  protected void displayResults(PrintStream out) throws IOException{
+    for(SortedMap<Long, ChunkImpl> stream: matchCatalog.values()) {
+      long nextToPrint = 0;
+      if(stream.firstKey() > 0)
+        System.err.println("---- map starts at "+ stream.firstKey());
+      for(Map.Entry<Long, ChunkImpl> e: stream.entrySet()) {
+        if(e.getKey() >= nextToPrint) {
+          if(e.getKey() > nextToPrint)
+            System.err.println("---- printing bytes starting at " + e.getKey());
+          
+          out.write(e.getValue().getData());
+          nextToPrint = e.getValue().getSeqID();
+        } else if(e.getValue().getSeqID() < nextToPrint) {
+          continue; //data already printed
+        } else {
+          //tricky case: chunk overlaps with already-printed data, but not completely
+          ChunkImpl c = e.getValue();
+          long chunkStartPos = e.getKey();
+          int numToPrint = (int) (c.getSeqID() - nextToPrint);
+          int printStartOffset = (int) ( nextToPrint -  chunkStartPos);
+          out.write(c.getData(), printStartOffset, numToPrint);
+          nextToPrint = c.getSeqID();
+        }
       }
+      out.println("\n--------------------");
     }
-    
   }
-
-  private static void updateMatchCatalog(
-      Map<String, SortedMap<Long, ChunkImpl>> matchCatalog, String streamName,
-      ChunkImpl chunk) {
+ 
+  protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) {
 
     SortedMap<Long, ChunkImpl> chunksInStream = matchCatalog.get(streamName);
     if(chunksInStream == null ) {
@@ -184,41 +266,29 @@
       if(chunk.getLength() > prevMatch.getLength())
         chunksInStream.put (startPos, chunk);
     }
-    
   }
 
-  static List<SearchRule> buildPatterns(String listOfPatterns) throws
-  PatternSyntaxException{
-    List<SearchRule> compiledPatterns = new ArrayList<SearchRule>();
-    //FIXME: could escape these
-    String[] patterns = listOfPatterns.split(SEPARATOR);
-    for(String p: patterns) {
-      int equalsPos = p.indexOf('=');
-      
-      if(equalsPos < 0 || equalsPos > (p.length() -2)) {
-        throw new PatternSyntaxException(
-            "pattern must be of form targ=pattern", p, -1);
-      }
-      
-      String targ = p.substring(0, equalsPos);
-      if(!ArrayUtils.contains(SEARCH_TARGS, targ)) {
-        throw new PatternSyntaxException(
-            "pattern doesn't start with recognized search target", p, -1);
+  static class DumpAndSummarize extends DumpChunks {
+    Map<String, Integer> matchCounts = new LinkedHashMap<String, Integer>();
+    
+
+    protected void displayResults(PrintStream out) throws IOException{
+      for(Map.Entry<String, Integer> s: matchCounts.entrySet()) {
+        out.print(s.getKey());
+        out.print(" ");
+        out.println(s.getValue());
       }
-      
-      Pattern pat = Pattern.compile(p.substring(equalsPos+1));
-      compiledPatterns.add(new SearchRule(pat, targ));
+        
     }
     
-    return compiledPatterns;
-  }
-
-  static boolean matchesPattern(List<SearchRule> matchers, ChunkImpl chunk) {
-    for(SearchRule r: matchers) {
-      if(!r.matches(chunk))
-        return false;
+    protected void updateMatchCatalog(String streamName,  ChunkImpl chunk) {
+      Integer i = matchCounts.get(streamName);
+      if(i != null)
+        matchCounts.put(streamName, i+1);
+      else
+        matchCounts.put(streamName, new Integer(1));
     }
-    return true;
+    
   }
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
Fri Jul 24 21:18:40 2009
@@ -24,9 +24,12 @@
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
 import org.apache.hadoop.conf.Configuration;
-
+/**
+ * Dumps received chunks into a public static array.
+ * This class is intended for unit tests, only.
+ */
 public class CaptureWriter implements ChukwaWriter {
-  static ArrayList<Chunk> outputs = new ArrayList<Chunk>();
+  public static ArrayList<Chunk> outputs = new ArrayList<Chunk>();
 
   @Override
   public void add(List<Chunk> chunks) throws WriterException {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Fri Jul 24 21:18:40 2009
@@ -37,9 +37,7 @@
     try {
       Configuration conf = new Configuration();
       conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
-      conf
-          .set(
-              "chukwaCollector.pipeline",
+      conf.set("chukwaCollector.pipeline",
               "org.apache.hadoop.chukwa.datacollection.writer.Dedup,"// note
                                                                      // comma
                   + "org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter");

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
Fri Jul 24 21:18:40 2009
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.chukwa.datacollection.writer;
 
 import java.io.File;

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=797659&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
Fri Jul 24 21:18:40 2009
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.Chunk;
+import java.util.ArrayList;
+import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
+import java.net.*;
+import java.io.*;
+
+public class TestSocketTee  extends TestCase{
+  public void testSocketTee() throws Exception {
+    
+    Configuration conf = new Configuration();  
+    
+    conf.set("chukwaCollector.pipeline",
+        SocketTeeWriter.class.getCanonicalName()+","// note comma
+        + CaptureWriter.class.getCanonicalName());
+    
+    conf.set("chukwaCollector.writerClass", 
+        PipelineStageWriter.class.getCanonicalName());
+    
+    PipelineStageWriter psw = new PipelineStageWriter();
+    psw.init(conf);
+
+    System.out.println("pipeline established; now pushing a chunk");
+    ArrayList<Chunk> l = new ArrayList<Chunk>();
+    l.add(new ChunkImpl("dt", "name", 1, new byte[] {'a'}, null));
+    psw.add(l);
+    //push a chunk through. It should get written, but the socket tee shouldn't do anything.
+    assertEquals(1, CaptureWriter.outputs.size());
+    //now connect and set up a filter.
+    
+    System.out.println("connecting to localhost");
+    Socket s = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
+ //   s.setSoTimeout(2000);
+    DataOutputStream dos = new DataOutputStream (s.getOutputStream());
+    dos.write((SocketTeeWriter.WRITABLE + " datatype=dt3\n").getBytes());
+    DataInputStream dis = new DataInputStream(s.getInputStream());
+
+    System.out.println("command send");
+
+    dis.readFully(new byte[3]);
+    //push a chunk not matching filter -- nothing should happen.
+    l = new ArrayList<Chunk>();
+    l.add(new ChunkImpl("dt2", "name", 1, new byte[] {'b'}, null));
+    psw.add(l);
+    assertEquals(2, CaptureWriter.outputs.size());
+
+    System.out.println("sent nonmatching chunk");
+
+    //and now one that does match -- data should be available to read off the socket
+
+    l = new ArrayList<Chunk>();
+    l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'c'}, null));
+    psw.add(l);
+    assertEquals(3, CaptureWriter.outputs.size());
+
+    System.out.println("sent matching chunk");
+    
+    System.out.println("reading...");
+    ChunkImpl chunk = ChunkImpl.read(dis);
+    assertTrue(chunk.getDataType().equals("dt3"));
+    System.out.println(chunk);
+
+    dis.close();
+    dos.close();
+    s.close();
+    
+    Socket s2 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
+    s2.getOutputStream().write((SocketTeeWriter.RAW+" content=.*c.*\n").getBytes());
+    dis = new DataInputStream(s2.getInputStream());
+    dis.readFully(new byte[3]); //read "OK\n"
+    l = new ArrayList<Chunk>();
+    l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'c'}, null));
+    psw.add(l);
+    int len = dis.readInt();
+    assertTrue(len == 1);
+    byte[] data = new byte[100];
+    int read = dis.read(data);
+    assertTrue(read == 1);
+    assertTrue(data[0] == 'c');
+  }
+  
+}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestDumpChunks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestDumpChunks.java?rev=797659&r1=797658&r2=797659&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestDumpChunks.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/util/TestDumpChunks.java Fri Jul
24 21:18:40 2009
@@ -39,6 +39,7 @@
     SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
         ChukwaArchiveKey.class, ChunkImpl.class,
         SequenceFile.CompressionType.NONE, null);
+    
     for (ChunkImpl chunk: chunks) {
       ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
       
@@ -57,23 +58,33 @@
   
   public void testBasicPatternMatching() {
    try {
-     List<DumpChunks.SearchRule> rules = DumpChunks.buildPatterns("host=foo.*&cluster=bar&datatype=Data");
+     DumpChunks.Filter rules = new DumpChunks.Filter("host=foo.*&cluster=bar&datatype=Data");
      assertEquals(3, rules.size());
      byte[] dat = "someText".getBytes();
      ChunkImpl chunkNone = new ChunkImpl("badData","aname", dat.length, dat, null);
-     assertFalse(DumpChunks.matchesPattern(rules, chunkNone));
+     assertFalse(rules.matches(chunkNone));
 
+       //do the right thing on a non-match
      ChunkImpl chunkSome = new ChunkImpl("badData", "aname", dat.length, dat, null);
      chunkSome.setSource("fooly");
      chunkSome.addTag("cluster=\"bar\"");
-     assertFalse(DumpChunks.matchesPattern(rules, chunkSome));
+     assertFalse(rules.matches( chunkSome));
 
      ChunkImpl chunkAll = new ChunkImpl("Data", "aname", dat.length, dat, null);
      chunkAll.setSource("fooly");
      chunkAll.addTag("cluster=\"bar\"");
 
      System.out.println("chunk is " + chunkAll);
-     assertTrue(DumpChunks.matchesPattern(rules, chunkAll));
+     assertTrue(rules.matches(chunkAll));
+     
+       //check that we match content correctly
+     rules = new DumpChunks.Filter("content=someText");
+     assertTrue(rules.matches(chunkAll));
+     rules = new DumpChunks.Filter("content=some");
+     assertFalse(rules.matches( chunkAll));
+     rules = new DumpChunks.Filter("datatype=Data&content=.*some.*");
+     assertTrue(rules.matches( chunkAll));
+
    } catch(Exception e) {
      fail("exception " + e);
    } 
@@ -107,8 +118,6 @@
     assertTrue(new String(capture.toByteArray()).startsWith("testing\n---"));
     //now test for matches.
     
-    
-    
   }
 
 }



Mime
View raw message