Return-Path: Delivered-To: apmail-hadoop-chukwa-commits-archive@minotaur.apache.org Received: (qmail 21948 invoked from network); 19 Jan 2010 19:39:56 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 19 Jan 2010 19:39:56 -0000 Received: (qmail 83520 invoked by uid 500); 19 Jan 2010 19:39:56 -0000 Delivered-To: apmail-hadoop-chukwa-commits-archive@hadoop.apache.org Received: (qmail 83483 invoked by uid 500); 19 Jan 2010 19:39:56 -0000 Mailing-List: contact chukwa-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: chukwa-dev@hadoop.apache.org Delivered-To: mailing list chukwa-commits@hadoop.apache.org Received: (qmail 83472 invoked by uid 99); 19 Jan 2010 19:39:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2010 19:39:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jan 2010 19:39:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 696E623888E3; Tue, 19 Jan 2010 19:39:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r900924 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ Date: Tue, 19 Jan 2010 19:39:32 -0000 To: chukwa-commits@hadoop.apache.org From: asrabkin@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100119193932.696E623888E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: asrabkin Date: Tue Jan 19 19:39:31 2010 New Revision: 900924 URL: http://svn.apache.org/viewvc?rev=900924&view=rev Log: CHUKWA-448. Write-ahead buffering for arbitrary adaptors. Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java Removed: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestMemBuffer.java Modified: hadoop/chukwa/trunk/CHANGES.txt hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Modified: hadoop/chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=900924&r1=900923&r2=900924&view=diff ============================================================================== --- hadoop/chukwa/trunk/CHANGES.txt (original) +++ hadoop/chukwa/trunk/CHANGES.txt Tue Jan 19 19:39:31 2010 @@ -4,6 +4,8 @@ NEW FEATURES + CHUKWA-448. Write-ahead buffering for arbitrary adaptors. (asrabkin) + CHUKWA-441. Added real time Hadoop activity monitor. (Eric Yang) CHUKWA-433. File-per-post writer for benchmark purposes. (asrabkin) Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=900924&r1=900923&r2=900924&view=diff ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (original) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Tue Jan 19 19:39:31 2010 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor; +import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.RESTARTING; import java.util.*; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; @@ -27,6 +28,9 @@ static final String BUF_SIZE_OPT = "adaptor.memBufWrapper.size"; static final int DEFAULT_BUF_SIZE = 1024*1024; //1 MB + //true by default. If you were willing to discard data, you didn't need Mem Buffers + static boolean BLOCK_WHEN_FULL = true; + static class MemBuf { long dataSizeBytes; final long maxDataSize; @@ -40,8 +44,11 @@ synchronized void add(Chunk c) throws InterruptedException{ int len = c.getData().length; - while(len + dataSizeBytes > maxDataSize) - wait(); + if(BLOCK_WHEN_FULL) + while(len + dataSizeBytes > maxDataSize) + wait(); + else + chunks.remove(); dataSizeBytes += len; chunks.add(c); } @@ -83,6 +90,7 @@ ChunkReceiver dest) throws AdaptorException { try { String dummyAdaptorID = adaptorID; + this.adaptorID = adaptorID; this.dest = dest; long bufSize = manager.getConfiguration().getInt(BUF_SIZE_OPT, DEFAULT_BUF_SIZE); @@ -95,10 +103,15 @@ } //Drain buffer into output queue - for(Chunk c:myBuffer.chunks) + long offsetToStartAt = offset; + for(Chunk c:myBuffer.chunks) { dest.add(c); + long seq = c.getSeqID(); + if(seq > offsetToStartAt) + offsetToStartAt = seq; + } - inner.start(dummyAdaptorID, innerType, offset, this); + inner.start(dummyAdaptorID, innerType, offsetToStartAt, this); } catch(InterruptedException e) { throw new AdaptorException(e); } @@ -108,5 +121,13 @@ public void committed(long l) { myBuffer.removeUpTo(l); } + + @Override + public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException { + if(p != RESTARTING) + buffers.remove(adaptorID); + return inner.shutdown(p); + } + } Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java?rev=900924&view=auto ============================================================================== --- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java (added) +++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java Tue Jan 19 19:39:31 2010 @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.adaptor; + +import java.util.*; +import java.io.*; +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.ChunkImpl; +import org.apache.hadoop.chukwa.datacollection.ChunkReceiver; +import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager; +import org.apache.log4j.Logger; +import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.*; + +public class WriteaheadBuffered extends AbstractWrapper { + Logger log = Logger.getLogger(WriteaheadBuffered.class); + static final String BUF_DIR_OPT = "adaptor.writeaheadWrapper.dir"; + static String BUF_DIR = "/tmp"; //1 MB + static long COMPACT_AT = 1024 * 1024; //compact when it can free at least this much storage + + File outBuf; + DataOutputStream outToDisk; + long fSize, highestSentOffset; + + + @Override + public synchronized void add(Chunk event) throws InterruptedException { + try { + event.write(outToDisk); + outToDisk.flush(); + fSize += event.getData().length; + long seq = event.getSeqID(); + if(seq > highestSentOffset) + highestSentOffset = seq; + } catch(IOException e) { + log.error(e); + } + dest.add(event); + } + + @Override + public void start(String adaptorID, String type, long offset, + ChunkReceiver dest) throws AdaptorException { + try { + String dummyAdaptorID = adaptorID; + this.dest = dest; + + outBuf = new File(BUF_DIR, adaptorID); + long newOffset = offset; + if(outBuf.length() > 0) { + DataInputStream dis = new DataInputStream(new FileInputStream(outBuf)); + while(dis.available() > 0) { + Chunk c = ChunkImpl.read(dis); + fSize += c.getData().length; + long seq = c.getSeqID(); + if(seq >offset) { + dest.add(c); + newOffset = seq; + } + } + //send chunks that are outstanding + dis.close(); + } + outToDisk = new DataOutputStream(new FileOutputStream(outBuf, true)); + + inner.start(dummyAdaptorID, innerType, newOffset, this); + } catch(IOException e) { + throw new AdaptorException(e); + } catch(InterruptedException e) { + throw new AdaptorException(e); + } + } + + @Override + public synchronized void committed(long l) { + + try { + long bytesOutstanding = highestSentOffset - l; + if(fSize - bytesOutstanding > COMPACT_AT) { + fSize = 0; + outToDisk.close(); + File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp"); + outBuf.renameTo(outBufTmp); + outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false)); + DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp)); + while(dis.available() > 0) { + Chunk c = ChunkImpl.read(dis); + if(c.getSeqID() > l) { //not yet committed + c.write(outToDisk); + fSize += c.getData().length; + } + } + dis.close(); + outBufTmp.delete(); + } + } catch(IOException e) { + log.error(e); + //should this be fatal? + } + } + + @Override + public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException { + if(p != RESTARTING) + outBuf.delete(); + return inner.shutdown(p); + } + +} Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java?rev=900924&view=auto ============================================================================== --- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java (added) +++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java Tue Jan 19 19:39:31 2010 @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.adaptor; + +import static org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile; +import java.io.File; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import junit.framework.TestCase; +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector; +import org.apache.hadoop.conf.Configuration; + +public class TestBufferingWrappers extends TestCase { + + Configuration conf = new Configuration(); + static File baseDir; + ChunkCatcherConnector chunks; + + public TestBufferingWrappers() throws IOException { + baseDir = new File(System.getProperty("test.build.data", "/tmp")); + conf.setInt("chukwaAgent.control.port", 0); + conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath()); + conf.setBoolean("chukwaAgent.checkpoint.enabled", false); + conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100); + conf.setInt("chukwaAgent.adaptor.context.switch.time", 100); + + chunks = new ChunkCatcherConnector(); + chunks.start(); + } + + public void testMBResendAfterStop() throws Exception{ + resendAfterStop("MemBuffered"); + } + + public void testWriteaheadResendAfterStop() throws Exception{ + resendAfterStop("WriteaheadBuffered"); + } + + + //start a wrapped FileAdaptor. Pushes a chunk. Stop it and restart. + //chunk hasn't been acked, so should get pushed again. + //we delete the file and also change the data type each time through the loop + //to make sure we get the cached chunk. + public void resendAfterStop(String adaptor) throws IOException, + ChukwaAgent.AlreadyRunningException, InterruptedException { + + ChukwaAgent agent = new ChukwaAgent(conf); + String ADAPTORID = "adaptor_test" + System.currentTimeMillis(); + String STR = "test data"; + int PORTNO = 9878; + DatagramSocket send = new DatagramSocket(); + byte[] buf = STR.getBytes(); + DatagramPacket p = new DatagramPacket(buf, buf.length); + p.setSocketAddress(new InetSocketAddress("127.0.0.1",PORTNO)); + + assertEquals(0, agent.adaptorCount()); + String name =agent.processAddCommand("add "+ ADAPTORID + " = "+adaptor+" UDPAdaptor raw "+PORTNO+ " 0"); + assertEquals(name, ADAPTORID); + Thread.sleep(500); + send.send(p); + + for(int i=0; i< 5; ++i) { + Chunk c = chunks.waitForAChunk(5000); + System.out.println("received " + i); + assertNotNull(c); + String dat = new String(c.getData()); + assertTrue(dat.equals(STR)); + assertTrue(c.getDataType().equals("raw")); + assertEquals(c.getSeqID(), STR.length()); + + agent.stopAdaptor(name, AdaptorShutdownPolicy.RESTARTING); + name =agent.processAddCommand("add "+ADAPTORID + " = "+adaptor+" UDPAdaptor raw "+PORTNO + " 0"); + assertEquals(name, ADAPTORID); + } + Chunk c = chunks.waitForAChunk(5000); + + Thread.sleep(500); + + buf = "different data".getBytes(); + p = new DatagramPacket(buf, buf.length); + p.setSocketAddress(new InetSocketAddress("127.0.0.1",PORTNO)); + send.send(p); + c = chunks.waitForAChunk(5000); + assertNotNull(c); + assertEquals(buf.length + STR.length(), c.getSeqID()); + + agent.stopAdaptor(name, true); + assertEquals(0, agent.adaptorCount()); + Thread.sleep(500);//before re-binding + agent.shutdown(); + } + +}