chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r832077 - in /hadoop/chukwa/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/test/org/apache/hadoop/chukwa/datacollection/writer/
Date Mon, 02 Nov 2009 19:42:12 GMT
Author: asrabkin
Date: Mon Nov  2 19:42:11 2009
New Revision: 832077

URL: http://svn.apache.org/viewvc?rev=832077&view=rev
Log:
CHUKWA-408. Optional metadata in real-time feed.

Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Nov  2 19:42:11 2009
@@ -2,6 +2,15 @@
 
 Trunk (unreleased changes)
 
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+    CHUKWA-408. Add optional metadata to real-time feed. (asrabkin)
+
+  BUG FIXES
+  
+
 Release 0.3
 
   NEW FEATURES

Modified: hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml (original)
+++ hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml Mon Nov  2
19:42:11 2009
@@ -101,7 +101,9 @@
 	  	Specifying "WRITABLE" will cause the chunks to be written using Hadoop's 
 	  	Writable serialization framework. "RAW" will send the internal data of the
 	  	Chunk, without any metadata, prefixed by its length encoded as a 32-bit int,
-	  	big-endian.  
+	  	big-endian.  "HEADER" is similar to "RAW", but with a one-line header in
+	  	front of the content. Header format is <code>hostname</code> 
+	  	<code>datatype</code> <code>stream name</code> <code>offset</code>,
separated by spaces.
 	  	</p>
 	  	<p>
 	  	The filter will be inactivated when the socket is closed.

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
Mon Nov  2 19:42:11 2009
@@ -61,6 +61,10 @@
 
   public static final String WRITABLE = "WRITABLE";
   public static final String RAW = "RAW";
+  public static final String ASCII_HEADER = "HEADER";
+  
+  static enum DataFormat {Raw, Writable, Header};
+  
   static boolean USE_KEEPALIVE = true;
   static final int DEFAULT_PORT = 9094;
   static int QUEUE_LENGTH = 1000;
@@ -115,17 +119,18 @@
     BufferedReader in;
     DataOutputStream out;
     Filter rules;
-    boolean sendRawBytes;
+    DataFormat fmt;
     final BlockingQueue<Chunk> sendQ;
+    
     public Tee(Socket s) throws IOException {
       sock = s;
       //now initialize asynchronously
       sendQ = new ArrayBlockingQueue<Chunk>(QUEUE_LENGTH);
       
-    Thread t = new Thread(this);
-    t.setDaemon(true);
-    t.start();
-  }
+      Thread t = new Thread(this);
+      t.setDaemon(true);
+      t.start();
+    }
     
     public void run() {
       setup();
@@ -133,15 +138,23 @@
         while(sock.isConnected()) {
           Chunk c = sendQ.take();
           
-          if(sendRawBytes) {
+          if(fmt == DataFormat.Raw) {
             byte[] data = c.getData();
             out.writeInt(data.length);
             out.write(data);
-          } else
+          } else if(fmt == DataFormat.Writable)
             c.write(out);
+          else {
+            byte[] data = c.getData();
+            byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+
" "+  
+                c.getSeqID()+"\n").getBytes(); 
+            out.writeInt(data.length+ header.length);
+            out.write(header);
+            out.write(data);
+          }
         }
       } catch(IOException e) {
-        log.info("lost tee", e);
+        log.info("lost tee: "+ e.toString());
         synchronized(tees) {
           tees.remove(this);
         }
@@ -168,14 +181,22 @@
         }
         String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
         if(RAW.equals(uppercased))
-          sendRawBytes = true;
-        else if(!WRITABLE.equals(uppercased)) {
+          fmt = DataFormat.Raw;
+        else if(WRITABLE.equals(uppercased))
+          fmt = DataFormat.Writable;
+        else if(ASCII_HEADER.equals(uppercased))
+          fmt = DataFormat.Header;
+        else {
           throw new PatternSyntaxException("bad command '" + uppercased+
-              "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE+"'.", cmd, -1);
+              "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor " 
+              + ASCII_HEADER+"'.", cmd, -1);
         }
         
         String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
-        rules = new Filter(cmdAfterSpace);
+        if(cmdAfterSpace.toLowerCase().equals("all"))
+          rules = Filter.ALL;
+        else
+          rules = new Filter(cmdAfterSpace);
 
           //now that we read everything OK we can add ourselves to list, and return.
         synchronized(tees) {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
Mon Nov  2 19:42:11 2009
@@ -108,8 +108,26 @@
     l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null));
     psw.add(l);
     assertEquals(5, CaptureWriter.outputs.size());
-//    Thread.sleep(1000);
-   
+    
+    
+    Socket s3 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
+    s3.getOutputStream().write((SocketTeeWriter.ASCII_HEADER+" all\n").getBytes());
+    dis = new DataInputStream(s3.getInputStream());
+    dis.readFully(new byte[3]); //read "OK\n"
+    l = new ArrayList<Chunk>();
+    chunk= new ChunkImpl("dataTypeFoo", "streamName", 4, new byte[] {'t','e','x','t'}, null);
+    chunk.setSource("hostNameFoo");
+    l.add(chunk);
+    psw.add(l);
+    assertEquals(6, CaptureWriter.outputs.size());
+    len = dis.readInt();
+    data = new byte[len];
+    read = dis.read(data);
+    String rcvd = new String(data);
+    System.out.println("got: " + rcvd);
+    assertTrue(rcvd.equals("hostNameFoo dataTypeFoo streamName 4\ntext"));
+    s3.close();
+    dis.close();
     
   }
   



Mime
View raw message