Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 71599 invoked from network); 2 Nov 2009 19:42:43 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Nov 2009 19:42:43 -0000 Received: (qmail 73839 invoked by uid 500); 2 Nov 2009 19:42:43 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 73825 invoked by uid 500); 2 Nov 2009 19:42:43 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 73815 invoked by uid 99); 2 Nov 2009 19:42:43 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2009 19:42:43 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Nov 2009 19:42:33 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 72F1823888A6; Mon, 2 Nov 2009 19:42:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r832077 - in /hadoop/chukwa/trunk: ./ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/test/org/apache/hadoop/chukwa/datacollection/writer/ Date: Mon, 02 Nov 2009 19:42:12 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091102194212.72F1823888A6@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Mon Nov 2 19:42:11 2009 New Revision: 832077 URL: http://svn.apache.org/viewvc?rev=832077&view=rev Log: CHUKWA-408. Optional metadata in real-time feed. Modified: hadoop/chukwa/trunk/CHANGES.txt hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=832077&r1=832076&r2=832077&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Mon Nov 2 19:42:11 2009 @@ -2,6 +2,15 @@ Trunk (unreleased changes) + NEW FEATURES + + IMPROVEMENTS + + CHUKWA-408. Add optional metadata to real-time feed. (asrabkin) + + BUG FIXES + + Release 0.3 NEW FEATURES Modified: hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml?rev=832077&r1=832076&r2=832077&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml (original) +++ hadoop/chukwa/trunk/src/docs/src/documentation/content/xdocs/collector.xml Mon Nov 2 19:42:11 2009 @@ -101,7 +101,9 @@ Specifying "WRITABLE" will cause the chunks to be written using Hadoop's Writable serialization framework. "RAW" will send the internal data of the Chunk, without any metadata, prefixed by its length encoded as a 32-bit int, - big-endian. + big-endian. "HEADER" is similar to "RAW", but with a one-line header in + front of the content. Header format is hostname + datatype stream name offset, separated by spaces.

The filter will be inactivated when the socket is closed. Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=832077&r1=832076&r2=832077&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Mon Nov 2 19:42:11 2009 @@ -61,6 +61,10 @@ public static final String WRITABLE = "WRITABLE"; public static final String RAW = "RAW"; + public static final String ASCII_HEADER = "HEADER"; + + static enum DataFormat {Raw, Writable, Header}; + static boolean USE_KEEPALIVE = true; static final int DEFAULT_PORT = 9094; static int QUEUE_LENGTH = 1000; @@ -115,17 +119,18 @@ BufferedReader in; DataOutputStream out; Filter rules; - boolean sendRawBytes; + DataFormat fmt; final BlockingQueue sendQ; + public Tee(Socket s) throws IOException { sock = s; //now initialize asynchronously sendQ = new ArrayBlockingQueue(QUEUE_LENGTH); - Thread t = new Thread(this); - t.setDaemon(true); - t.start(); - } + Thread t = new Thread(this); + t.setDaemon(true); + t.start(); + } public void run() { setup(); @@ -133,15 +138,23 @@ while(sock.isConnected()) { Chunk c = sendQ.take(); - if(sendRawBytes) { + if(fmt == DataFormat.Raw) { byte[] data = c.getData(); out.writeInt(data.length); out.write(data); - } else + } else if(fmt == DataFormat.Writable) c.write(out); + else { + byte[] data = c.getData(); + byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+ + c.getSeqID()+"\n").getBytes(); + out.writeInt(data.length+ header.length); + out.write(header); + out.write(data); + } } } catch(IOException e) { - log.info("lost tee", e); + log.info("lost tee: "+ e.toString()); synchronized(tees) { tees.remove(this); } @@ -168,14 +181,22 @@ } String uppercased = cmd.substring(0, cmd.indexOf(' ')).toUpperCase(); if(RAW.equals(uppercased)) - sendRawBytes = true; - else if(!WRITABLE.equals(uppercased)) { + fmt = DataFormat.Raw; + else if(WRITABLE.equals(uppercased)) + fmt = DataFormat.Writable; + else if(ASCII_HEADER.equals(uppercased)) + fmt = DataFormat.Header; + else { throw new PatternSyntaxException("bad command '" + uppercased+ - "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE+"'.", cmd, -1); + "' -- starts with neither '"+ RAW+ "' nor '"+ WRITABLE + " nor " + + ASCII_HEADER+"'.", cmd, -1); } String cmdAfterSpace = cmd.substring(cmd.indexOf(' ')+1); - rules = new Filter(cmdAfterSpace); + if(cmdAfterSpace.toLowerCase().equals("all")) + rules = Filter.ALL; + else + rules = new Filter(cmdAfterSpace); //now that we read everything OK we can add ourselves to list, and return. synchronized(tees) { Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java?rev=832077&r1=832076&r2=832077&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java (original) +++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestSocketTee.java Mon Nov 2 19:42:11 2009 @@ -108,8 +108,26 @@ l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null)); psw.add(l); assertEquals(5, CaptureWriter.outputs.size()); -// Thread.sleep(1000); - + + + Socket s3 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT); + s3.getOutputStream().write((SocketTeeWriter.ASCII_HEADER+" all\n").getBytes()); + dis = new DataInputStream(s3.getInputStream()); + dis.readFully(new byte[3]); //read "OK\n" + l = new ArrayList(); + chunk= new ChunkImpl("dataTypeFoo", "streamName", 4, new byte[] {'t','e','x','t'}, null); + chunk.setSource("hostNameFoo"); + l.add(chunk); + psw.add(l); + assertEquals(6, CaptureWriter.outputs.size()); + len = dis.readInt(); + data = new byte[len]; + read = dis.read(data); + String rcvd = new String(data); + System.out.println("got: " + rcvd); + assertTrue(rcvd.equals("hostNameFoo dataTypeFoo streamName 4\ntext")); + s3.close(); + dis.close(); }