Return-Path:
Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org
Received: (qmail 71599 invoked from network); 2 Nov 2009 19:42:43 -0000
Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3)
by minotaur.apache.org with SMTP; 2 Nov 2009 19:42:43 -0000
Received: (qmail 73839 invoked by uid 500); 2 Nov 2009 19:42:43 -0000
Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org
Received: (qmail 73825 invoked by uid 500); 2 Nov 2009 19:42:43 -0000
Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: chukwa-dev@hadoop.apache.org
Delivered-To: mailing list chukwa-commits@hadoop.apache.org
Received: (qmail 73815 invoked by uid 99); 2 Nov 2009 19:42:43 -0000
Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230)
by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2009 19:42:43 +0000
X-ASF-Spam-Status: No, hits=-2000.0 required=10.0
tests=ALL_TRUSTED
X-Spam-Check-By: apache.org
Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4)
by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2009 19:42:33 +0000
Received: by eris.apache.org (Postfix, from userid 65534)
id 72F1823888A6; Mon, 2 Nov 2009 19:42:12 +0000 (UTC)
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: svn commit: r832077 - in /hadoop/chukwa/trunk: ./
src/docs/src/documentation/content/xdocs/
src/java/org/apache/hadoop/chukwa/datacollection/writer/
src/test/org/apache/hadoop/chukwa/datacollection/writer/
Date: Mon, 02 Nov 2009 19:42:12 -0000
To: chukwa-commits@hadoop.apache.org
From: asrabkin@apache.org
X-Mailer: svnmailer-1.0.8
Message-Id: <20091102194212.72F1823888A6@eris.apache.org>
X-Virus-Checked: Checked by ClamAV on apache.org
Author: asrabkin
Date: Mon Nov 2 19:42:11 2009
New Revision: 832077
URL: http://svn.apache.org/viewvc?rev=832077&view=rev
Log:
CHUKWA-408. Optional metadata in real-time feed.
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Nov 2 19:42:11 2009
@@ -2,6 +2,15 @@
Trunk (unreleased changes)
+ NEW FEATURES
+
+ IMPROVEMENTS
+
+ CHUKWA-408. Add optional metadata to real-time feed. (asrabkin)
+
+ BUG FIXES
+
+
Release 0.3
NEW FEATURES
Modified: hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml (original)
+++ hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml Mon Nov 2 19:42:11 2009
@@ -101,7 +101,9 @@
Specifying "WRITABLE" will cause the chunks to be written using Hadoop's
Writable serialization framework. "RAW" will send the internal data of the
Chunk, without any metadata, prefixed by its length encoded as a 32-bit int,
- big-endian.
+ big-endian. "HEADER" is similar to "RAW", but with a one-line header in
+ front of the content. Header format is hostname
+ datatype
stream name
offset
, separated by spaces.
The filter will be inactivated when the socket is closed.
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Mon Nov 2 19:42:11 2009
@@ -61,6 +61,10 @@
public static final String WRITABLE = "WRITABLE";
public static final String RAW = "RAW";
+ public static final String ASCII_HEADER = "HEADER";
+
+ static enum DataFormat {Raw, Writable, Header};
+
static boolean USE_KEEPALIVE = true;
static final int DEFAULT_PORT = 9094;
static int QUEUE_LENGTH = 1000;
@@ -115,17 +119,18 @@
BufferedReader in;
DataOutputStream out;
Filter rules;
- boolean sendRawBytes;
+ DataFormat fmt;
final BlockingQueue sendQ;
+
public Tee(Socket s) throws IOException {
sock = s;
//now initialize asynchronously
sendQ = new ArrayBlockingQueue(QUEUE_LENGTH);
- Thread t = new Thread(this);
- t.setDaemon(true);
- t.start();
- }
+ Thread t = new Thread(this);
+ t.setDaemon(true);
+ t.start();
+ }
public void run() {
setup();
@@ -133,15 +138,23 @@
while(sock.isConnected()) {
Chunk c = sendQ.take();
- if(sendRawBytes) {
+ if(fmt == DataFormat.Raw) {
byte[] data = c.getData();
out.writeInt(data.length);
out.write(data);
- } else
+ } else if(fmt == DataFormat.Writable)
c.write(out);
+ else {
+ byte[] data = c.getData();
+ byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+
+ c.getSeqID()+"\n").getBytes();
+ out.writeInt(data.length+ header.length);
+ out.write(header);
+ out.write(data);
+ }
}
} catch(IOException e) {
- log.info("lost tee", e);
+ log.info("lost tee: "+ e.toString());
synchronized(tees) {
tees.remove(this);
}
@@ -168,14 +181,22 @@
}
String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase();
if(RAW.equals(uppercased))
- sendRawBytes = true;
- else if(!WRITABLE.equals(uppercased)) {
+ fmt = DataFormat.Raw;
+ else if(WRITABLE.equals(uppercased))
+ fmt = DataFormat.Writable;
+ else if(ASCII_HEADER.equals(uppercased))
+ fmt = DataFormat.Header;
+ else {
throw new PatternSyntaxException("bad command '" + uppercased+
- "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE+"'.", cmd, -1);
+ "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor "
+ + ASCII_HEADER+"'.", cmd, -1);
}
String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1);
- rules = new Filter(cmdAfterSpace);
+ if(cmdAfterSpace.toLowerCase().equals("all"))
+ rules = Filter.ALL;
+ else
+ rules = new Filter(cmdAfterSpace);
//now that we read everything OK we can add ourselves to list, and return.
synchronized(tees) {
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=832077&r1=832076&r2=832077&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java Mon Nov 2 19:42:11 2009
@@ -108,8 +108,26 @@
l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null));
psw.add(l);
assertEquals(5, CaptureWriter.outputs.size());
-// Thread.sleep(1000);
-
+
+
+ Socket s3 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
+ s3.getOutputStream().write((SocketTeeWriter.ASCII_HEADER+" all\n").getBytes());
+ dis = new DataInputStream(s3.getInputStream());
+ dis.readFully(new byte[3]); //read "OK\n"
+ l = new ArrayList();
+ chunk= new ChunkImpl("dataTypeFoo", "streamName", 4, new byte[] {'t','e','x','t'}, null);
+ chunk.setSource("hostNameFoo");
+ l.add(chunk);
+ psw.add(l);
+ assertEquals(6, CaptureWriter.outputs.size());
+ len = dis.readInt();
+ data = new byte[len];
+ read = dis.read(data);
+ String rcvd = new String(data);
+ System.out.println("got: " + rcvd);
+ assertTrue(rcvd.equals("hostNameFoo dataTypeFoo streamName 4\ntext"));
+ s3.close();
+ dis.close();
}