Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 6539 invoked from network); 23 Mar 2010 18:33:40 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 23 Mar 2010 18:33:40 -0000 Received: (qmail 18330 invoked by uid 500); 23 Mar 2010 18:33:40 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 18310 invoked by uid 500); 23 Mar 2010 18:33:40 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 18302 invoked by uid 99); 23 Mar 2010 18:33:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Mar 2010 18:33:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 Mar 2010 18:33:34 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9B81B238897F; Tue, 23 Mar 2010 18:33:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r926711 - in /hadoop/chukwa/branches/chukwa-0.4: ./ src/java/org/apache/hadoop/chukwa/datacollection/collector/ src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ Date: Tue, 23 Mar 2010 18:33:12 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100323183312.9B81B238897F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Tue Mar 23 18:33:12 2010 New Revision: 926711 URL: http://svn.apache.org/viewvc?rev=926711&view=rev Log: CHUKWA-445. Realtime display at collector. Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java Modified: hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Modified: hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt?rev=926711&r1=926710&r2=926711&view=diff ============================================================================== --- hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt (original) +++ hadoop/chukwa/branches/chukwa-0.4/CHANGES.txt Tue Mar 23 18:33:12 2010 @@ -4,6 +4,8 @@ Trunk (unreleased changes) NEW FEATURES + CHUKWA-445. Realtime display at collector. (asrabkin) + CHUKWA-454. DirTailingAdaptor can filter files. (Gerrit Jansen van Vuuren via asrabkin) CHUKWA-449. Utility to generate sequence file from log file. (Bill Graham via asrabkin) Modified: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=926711&r1=926710&r2=926711&view=diff ============================================================================== --- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original) +++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Tue Mar 23 18:33:12 2010 @@ -22,8 +22,7 @@ package org.apache.hadoop.chukwa.datacol import org.mortbay.jetty.*; import org.mortbay.jetty.nio.SelectChannelConnector; import org.mortbay.jetty.servlet.*; -import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet; -import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector; +import org.apache.hadoop.chukwa.datacollection.collector.servlet.*; import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector; import org.apache.hadoop.chukwa.datacollection.writer.*; import org.apache.hadoop.chukwa.util.DaemonWatcher; @@ -107,6 +106,10 @@ public class CollectorStub { if(conf.getBoolean(HttpConnector.ASYNC_ACKS_OPT, false)) root.addServlet(new ServletHolder(new CommitCheckServlet(conf)), "/"+CommitCheckServlet.DEFAULT_PATH); + if(conf.getBoolean(LogDisplayServlet.ENABLED_OPT, false)) + root.addServlet(new ServletHolder(new LogDisplayServlet(conf)), "/"+LogDisplayServlet.DEFAULT_PATH); + + root.setAllowNullPathInfo(false); // Add in any user-specified servlets Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java?rev=926711&view=auto ============================================================================== --- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java (added) +++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java Tue Mar 23 18:33:12 2010 @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.collector.servlet; + +import javax.servlet.ServletConfig; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.log4j.Logger; +import java.io.*; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import org.apache.hadoop.chukwa.*; +import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter; +import org.apache.hadoop.conf.Configuration; + +public class LogDisplayServlet extends HttpServlet { + + /* + static class StreamName { + byte[] md5; + public StreamName(Chunk c) { + + } + @Override + public int hashCode() { + int x=0; + for(int i=0; i< md5.length; ++i) { + x ^= (md5[i] << 4 * i); + } + return x; + } + + public boolean equals(Object x) { + if(x instanceof StreamName) + return Arrays.equals(md5, ((StreamName)x).md5); + else return false; + } + }*/ + + public static final String DEFAULT_PATH = "logs"; + public static final String ENABLED_OPT = "chukwaCollector.showLogs.enabled"; + public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer"; + long BUF_SIZE = 1024* 1024; + + Configuration conf; + Map> chunksBySID = new HashMap>(); + Queue receivedSIDs = new LinkedList(); + long totalStoredSize = 0; + + private static final long serialVersionUID = -4602082382919009285L; + protected static Logger log = Logger.getLogger(LogDisplayServlet.class); + + public LogDisplayServlet() { + conf = new Configuration(); + ExtractorWriter.recipient = this; + } + + public LogDisplayServlet(Configuration c) { + conf = c; + ExtractorWriter.recipient = this; + } + + public void init(ServletConfig servletConf) throws ServletException { + BUF_SIZE = conf.getLong(BUF_SIZE_OPT, BUF_SIZE); + } + + private String getSID(Chunk c) { + try { + MessageDigest md; + md = MessageDigest.getInstance("MD5"); + + md.update(c.getSource().getBytes()); + md.update(c.getStreamName().getBytes()); + md.update(c.getTags().getBytes()); + StringBuilder sb = new StringBuilder(); + byte[] bytes = md.digest(); + for(int i=0; i < bytes.length; ++i) { + if( (bytes[i] & 0xF0) == 0) + sb.append('0'); + sb.append( Integer.toHexString(0xFF & bytes[i]) ); + } + return sb.toString(); + } catch(NoSuchAlgorithmException n) { + log.fatal(n); + System.exit(0); + return null; + } + } + + + private void pruneOldEntries() { + while(totalStoredSize > BUF_SIZE) { + String queueToPrune = receivedSIDs.remove(); + Deque stream = chunksBySID.get(queueToPrune); + assert !stream.isEmpty() : " expected a chunk in stream with ID " + queueToPrune; + Chunk c = stream.poll(); + if(c != null) + totalStoredSize -= c.getData().length; + if(stream.isEmpty()) { //remove empty deques and their names. + chunksBySID.remove(queueToPrune); + } + } + } + + public synchronized void add(List chunks) { + for(Chunk c : chunks) { + String sid = getSID(c); + Deque stream = chunksBySID.get(sid); + if(stream == null) { + stream = new LinkedList(); + chunksBySID.put(sid, stream); + } + stream.add(c); + receivedSIDs.add(sid); + totalStoredSize += c.getData().length; + } + pruneOldEntries(); + } + + + @Override + protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp) + throws ServletException, IOException { + + PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream())); + resp.setStatus(200); + String path = req.getServletPath(); + String streamID = req.getParameter("sid"); + if (streamID != null) { + try { + Deque chunks = chunksBySID.get(streamID); + if(chunks != null) { + String streamName = getFriendlyName(chunks.peek()); + out.println("Chukwa:Received Data

Data from "+ streamName + "

"); + out.println("
");
+          for(Chunk c: chunks) {
+            out.write(c.getData());
+          }
+          out.println("

Back to list of streams"); + } else + out.println("No data"); + } catch(Exception e) { + out.println("No data"); + } + out.println(""); + } else { + out.println("Chukwa:Received Data

Recently-seen streams

"); + } + out.flush(); + } + + private String getFriendlyName(Chunk chunk) { + if(chunk != null) + return chunk.getTags() + "/" + chunk.getSource() + "/" + chunk.getStreamName(); + else return "null"; + } + + +} Added: hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java?rev=926711&view=auto ============================================================================== --- hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java (added) +++ hadoop/chukwa/branches/chukwa-0.4/src/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java Tue Mar 23 18:33:12 2010 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.writer; + +import java.util.List; +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.collector.servlet.LogDisplayServlet; +import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus; +import org.apache.hadoop.conf.Configuration; + +public class ExtractorWriter extends PipelineableWriter { + + public static LogDisplayServlet recipient; + + @Override + public void close() throws WriterException { + next.close(); + } + + @Override + public void init(Configuration c) throws WriterException { + } + + public CommitStatus add(List chunks) throws WriterException { + if(recipient != null) + recipient.add(chunks); + if (next != null) + return next.add(chunks); //pass data through + else + return ChukwaWriter.COMMIT_OK; + } + +}