chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r799496 - in /hadoop/chukwa/trunk/src: java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ java/org/apache/hadoop/chukwa/datacollection/writer/ test/org/apache/hadoop/chukwa/datacollection/agent/ test/org/apache/hadoop/chukwa/da...
Date Fri, 31 Jul 2009 02:55:56 GMT
Author: asrabkin
Date: Fri Jul 31 02:55:56 2009
New Revision: 799496

URL: http://svn.apache.org/viewvc?rev=799496&view=rev
Log:
CHUKWA-358. Realtime feed from collector, now with improved threading model

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=799496&r1=799495&r2=799496&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
Fri Jul 31 02:55:56 2009
@@ -194,9 +194,9 @@
     if (pingAtt != null) {
       out.println("Date:" + ServletCollector.statTime);
       out.println("Now:" + System.currentTimeMillis());
-      out.println("numberHTTPConnection:"
+      out.println("numberHTTPConnection in time window:"
           + ServletCollector.numberHTTPConnection);
-      out.println("numberchunks:" + ServletCollector.numberchunks);
+      out.println("numberchunks in time window:" + ServletCollector.numberchunks);
       out.println("lifetimechunks:" + ServletCollector.lifetimechunks);
     } else {
       out.println("<html><body><h2>Chukwa servlet running</h2>");

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=799496&r1=799495&r2=799496&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
Fri Jul 31 02:55:56 2009
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.regex.PatternSyntaxException;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.util.DumpChunks;
@@ -52,17 +52,23 @@
  *              
  *  In english: clients should connect and say either "RAW " or "WRITABLE " 
  *  followed by a filter.  (Note that the keyword is followed by exactly one space.)
- *  They'll then receive either a sequence of byte arrays or of writable-serialized
+ *  They'll then receive either a sequence of byte arrays or of writable-serialized.
+ *  
+ *  Option chukwaCollector.tee.keepalive controls using TCP keepalive. Defaults to true.
  *  
  */
 public class SocketTeeWriter implements PipelineableWriter {
 
   public static final String WRITABLE = "WRITABLE";
   public static final String RAW = "RAW";
+  static boolean USE_KEEPALIVE = true;
   static final int DEFAULT_PORT = 9094;
+  static int QUEUE_LENGTH = 1000;
+  
   static Logger log = Logger.getLogger(SocketTeeWriter.class);
   volatile boolean running = true;
   int timeout;
+//  private final ExecutorService pool;
   
   /**
    * Listens for incoming connections, spawns a Tee to deal with each.
@@ -71,7 +77,9 @@
     ServerSocket s;
     public SocketListenThread(Configuration conf) throws IOException {
       int portno = conf.getInt("chukwaCollector.tee.port", DEFAULT_PORT);
+      USE_KEEPALIVE = conf.getBoolean("chukwaCollector.tee.keepalive", true);
       s = new ServerSocket(portno);
+      setDaemon(true);
     }
     
     public void run() {
@@ -89,7 +97,9 @@
     
     public void shutdown() {
       try{
-        s.close();
+        //running was set to false by caller.
+        s.close(); //to break out of run loop
+        this.interrupt();
       } catch(IOException e) {
         
       }
@@ -106,19 +116,48 @@
     DataOutputStream out;
     DumpChunks.Filter rules;
     boolean sendRawBytes;
+    final BlockingQueue<Chunk> sendQ;
     public Tee(Socket s) throws IOException {
       sock = s;
       //now initialize asynchronously
-      run();
-//      new Thread(this).start();
+      sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
+      
+    Thread t = new Thread(this);
+    t.setDaemon(true);
+    t.start();
+  }
+    
+    public void run() {
+      setup();
+      try {
+        while(sock.isConnected()) {
+          Chunk c = sendQ.take();
+          
+          if(sendRawBytes) {
+            byte[] data = c.getData();
+            out.writeInt(data.length);
+            out.write(data);
+          } else
+            c.write(out);
+        }
+      } catch(IOException e) {
+        log.info("lost tee", e);
+        synchronized(tees) {
+          tees.remove(this);
+        }
+      } catch(InterruptedException e) {
+        //exit quietly
+      }
     }
+    
     /**
      * initializes the tee.
      */
-    public void run() {
+    public void setup() {
       try {   //outer try catches IOExceptions
        try { //inner try catches Pattern Syntax errors
         sock.setSoTimeout(timeout);
+        sock.setKeepAlive(USE_KEEPALIVE);
         in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
         String cmd = in.readLine();
         if(!cmd.contains(" ")) {
@@ -157,26 +196,19 @@
       }
     }
     
-    public void maybeSend(Chunk c) throws IOException {
-      if(rules.matches(c)) {
-        if(sendRawBytes) {
-          byte[] data = c.getData();
-          out.writeInt(data.length);
-          out.write(data);
-        } else
-          c.write(out);
-      }
-    }
-    
     public void close() {
       try {
         out.close();
         in.close();
       } catch(Exception e) {}
     }
+
+    public void handle(Chunk c) {
+      if(rules.matches(c)) 
+        sendQ.add(c);
+    }
   }
-  
-  
+
   /////////////////Main class SocketTeeWriter//////////////////////
   
   
@@ -196,14 +228,8 @@
       Iterator<Tee> loop = tees.iterator();
       while(loop.hasNext()) {
         Tee t = loop.next();
-        try {
-          for(Chunk c: chunks) {
-            t.maybeSend(c);
-          }
-        } catch(IOException e) {
-          t.close();
-          loop.remove(); //drop failed tee from list.
-          log.info("lost connection: "+ e.toString());
+        for(Chunk c: chunks) {
+          t.handle(c);
         }
       }
     }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java?rev=799496&r1=799495&r2=799496&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestCmd.java
Fri Jul 31 02:55:56 2009
@@ -23,6 +23,7 @@
 import org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.conf.Configuration;
 import junit.framework.TestCase;
 
 public class TestCmd extends TestCase {
@@ -30,7 +31,9 @@
   public void testAddCmdWithParam() {
     ChukwaAgent agent;
     try {
-      agent = new ChukwaAgent();
+      Configuration conf = new Configuration();
+      conf.set("chukwaAgent.control.port", "0");
+      agent = new ChukwaAgent(conf);
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       String l = agent
@@ -60,7 +63,9 @@
   public void testAddCmdWithoutParam1() {
     ChukwaAgent agent;
     try {
-      agent = new ChukwaAgent();
+      Configuration conf = new Configuration();
+      conf.set("chukwaAgent.control.port", "0");
+      agent = new ChukwaAgent(conf);
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       String name = agent
@@ -89,7 +94,9 @@
   public void testAddCmdWithoutParam2() {
     ChukwaAgent agent;
     try {
-      agent = new ChukwaAgent();
+      Configuration conf = new Configuration();
+      conf.set("chukwaAgent.control.port", "0");
+      agent = new ChukwaAgent(conf);
       ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
       conn.start();
       String n = agent

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=799496&r1=799495&r2=799496&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
Fri Jul 31 02:55:56 2009
@@ -86,18 +86,31 @@
     s.close();
     
     Socket s2 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
-    s2.getOutputStream().write((SocketTeeWriter.RAW+" content=.*c.*\n").getBytes());
+    s2.getOutputStream().write((SocketTeeWriter.RAW+" content=.*d.*\n").getBytes());
     dis = new DataInputStream(s2.getInputStream());
     dis.readFully(new byte[3]); //read "OK\n"
     l = new ArrayList<Chunk>();
-    l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'c'}, null));
+    l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'d'}, null));
     psw.add(l);
+    assertEquals(4, CaptureWriter.outputs.size());
+
     int len = dis.readInt();
     assertTrue(len == 1);
     byte[] data = new byte[100];
     int read = dis.read(data);
     assertTrue(read == 1);
-    assertTrue(data[0] == 'c');
+    assertTrue(data[0] == 'd');
+    
+    s2.close();
+    dis.close();
+    
+    l = new ArrayList<Chunk>();
+    l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null));
+    psw.add(l);
+    assertEquals(5, CaptureWriter.outputs.size());
+//    Thread.sleep(1000);
+   
+    
   }
   
 }



Mime
View raw message