chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r900924 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/test/org/apache/hadoop/chukwa/datacollection/adaptor/
Date Tue, 19 Jan 2010 19:39:32 GMT
Author: asrabkin
Date: Tue Jan 19 19:39:31 2010
New Revision: 900924

URL: http://svn.apache.org/viewvc?rev=900924&view=rev
Log:
CHUKWA-448. Write-ahead buffering for arbitrary adaptors.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
Removed:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestMemBuffer.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=900924&r1=900923&r2=900924&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Tue Jan 19 19:39:31 2010
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-448. Write-ahead buffering for arbitrary adaptors. (asrabkin)
+
     CHUKWA-441. Added real time Hadoop activity monitor. (Eric Yang)
 
     CHUKWA-433. File-per-post writer for benchmark purposes. (asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=900924&r1=900923&r2=900924&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
Tue Jan 19 19:39:31 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
+import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.RESTARTING;
 import java.util.*;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
@@ -27,6 +28,9 @@
   static final String BUF_SIZE_OPT = "adaptor.memBufWrapper.size";
   static final int DEFAULT_BUF_SIZE = 1024*1024; //1 MB
   
+  //true by default. If you were willing to discard data, you didn't need Mem Buffers
+  static boolean BLOCK_WHEN_FULL = true;
+  
   static class MemBuf {
     long dataSizeBytes;
     final long maxDataSize;
@@ -40,8 +44,11 @@
     
     synchronized void add(Chunk c) throws InterruptedException{
       int len = c.getData().length;
-      while(len + dataSizeBytes > maxDataSize)
-        wait();
+      if(BLOCK_WHEN_FULL)
+        while(len + dataSizeBytes > maxDataSize)
+          wait();
+      else
+        chunks.remove();
       dataSizeBytes += len;
       chunks.add(c);
     }
@@ -83,6 +90,7 @@
       ChunkReceiver dest) throws AdaptorException {
     try {
       String dummyAdaptorID = adaptorID;
+      this.adaptorID = adaptorID;
       this.dest = dest;
       
       long bufSize = manager.getConfiguration().getInt(BUF_SIZE_OPT, DEFAULT_BUF_SIZE);
@@ -95,10 +103,15 @@
       }
 
       //Drain buffer into output queue
-      for(Chunk c:myBuffer.chunks)
+      long offsetToStartAt = offset;
+      for(Chunk c:myBuffer.chunks) {
         dest.add(c);
+        long seq = c.getSeqID();
+        if(seq > offsetToStartAt)
+          offsetToStartAt = seq;
+      }
       
-      inner.start(dummyAdaptorID, innerType, offset, this);
+      inner.start(dummyAdaptorID, innerType, offsetToStartAt, this);
     } catch(InterruptedException e) {
      throw new AdaptorException(e);
     }
@@ -108,5 +121,13 @@
   public void committed(long l) {
     myBuffer.removeUpTo(l);
   }
+  
+  @Override
+  public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
+    if(p != RESTARTING)
+      buffers.remove(adaptorID);    
+    return inner.shutdown(p);
+  }
+
 
 }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java?rev=900924&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
(added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java
Tue Jan 19 19:39:31 2010
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.log4j.Logger;
+import static org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy.*;
+
+public class WriteaheadBuffered extends AbstractWrapper {
+  Logger log = Logger.getLogger(WriteaheadBuffered.class);
+  static final String BUF_DIR_OPT = "adaptor.writeaheadWrapper.dir";
+  static  String BUF_DIR = "/tmp"; //1 MB
+  static long COMPACT_AT = 1024 * 1024; //compact when it can free at least this much storage
+  
+  File outBuf;
+  DataOutputStream outToDisk;
+  long fSize, highestSentOffset;
+  
+  
+  @Override
+  public synchronized void add(Chunk event) throws InterruptedException {
+    try {
+      event.write(outToDisk);
+      outToDisk.flush();
+      fSize += event.getData().length;
+      long seq = event.getSeqID();
+      if(seq > highestSentOffset)
+        highestSentOffset = seq;
+    } catch(IOException e) {
+      log.error(e);
+    }
+    dest.add(event);
+  }
+  
+  @Override
+  public void start(String adaptorID, String type, long offset,
+      ChunkReceiver dest) throws AdaptorException {
+    try {
+      String dummyAdaptorID = adaptorID;
+      this.dest = dest;
+      
+      outBuf = new File(BUF_DIR, adaptorID);
+      long newOffset = offset;
+      if(outBuf.length() > 0) {
+        DataInputStream dis = new DataInputStream(new FileInputStream(outBuf));
+        while(dis.available() > 0) {
+          Chunk c = ChunkImpl.read(dis);
+          fSize += c.getData().length;
+          long seq = c.getSeqID();
+          if(seq >offset) {
+            dest.add(c);
+            newOffset = seq;
+          }
+        }
+        //send chunks that are outstanding        
+        dis.close();
+      }
+      outToDisk = new DataOutputStream(new FileOutputStream(outBuf, true));
+      
+      inner.start(dummyAdaptorID, innerType, newOffset, this);
+    } catch(IOException e) {
+      throw new AdaptorException(e);
+    } catch(InterruptedException e) {
+      throw new AdaptorException(e);
+    }
+  }
+  
+  @Override
+  public synchronized void committed(long l) {
+
+    try {
+      long bytesOutstanding = highestSentOffset - l;
+      if(fSize - bytesOutstanding > COMPACT_AT) {
+        fSize = 0;
+        outToDisk.close();
+        File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp");
+        outBuf.renameTo(outBufTmp);
+        outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false));
+        DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp));
+        while(dis.available() > 0) {
+          Chunk c = ChunkImpl.read(dis);
+          if(c.getSeqID() > l) { //not yet committed
+            c.write(outToDisk);
+            fSize += c.getData().length;
+          }
+        }
+        dis.close();
+        outBufTmp.delete();
+      }
+    } catch(IOException e) {
+      log.error(e);
+      //should this be fatal?
+    }
+  }
+  
+  @Override
+  public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException {
+    if(p != RESTARTING)
+      outBuf.delete();    
+    return inner.shutdown(p);
+  }
+
+}

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java?rev=900924&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
(added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestBufferingWrappers.java
Tue Jan 19 19:39:31 2010
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import static org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile;
+import java.io.File;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetSocketAddress;
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestBufferingWrappers extends TestCase {
+
+  Configuration conf = new Configuration();
+  static File baseDir;
+  ChunkCatcherConnector chunks;
+  
+  public TestBufferingWrappers() throws IOException {
+    baseDir = new File(System.getProperty("test.build.data", "/tmp"));
+    conf.setInt("chukwaAgent.control.port", 0);
+    conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
+    conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
+    conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100);
+    conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
+
+    chunks = new ChunkCatcherConnector();
+    chunks.start();
+  }
+  
+  public void testMBResendAfterStop() throws Exception{
+    resendAfterStop("MemBuffered");
+  }
+
+  public void testWriteaheadResendAfterStop() throws Exception{
+    resendAfterStop("WriteaheadBuffered");
+  }
+
+  
+  //start a wrapped FileAdaptor. Pushes a chunk. Stop it and restart.
+  //chunk hasn't been acked, so should get pushed again.
+  //we delete the file and also change the data type each time through the loop
+  //to make sure we get the cached chunk.
+  public void resendAfterStop(String adaptor)  throws IOException,
+  ChukwaAgent.AlreadyRunningException, InterruptedException {
+    
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    String ADAPTORID = "adaptor_test" + System.currentTimeMillis(); 
+    String STR = "test data";
+    int PORTNO = 9878;
+    DatagramSocket send = new DatagramSocket();
+    byte[] buf = STR.getBytes();
+    DatagramPacket p = new DatagramPacket(buf, buf.length);
+    p.setSocketAddress(new InetSocketAddress("127.0.0.1",PORTNO));
+    
+    assertEquals(0, agent.adaptorCount());
+    String name =agent.processAddCommand("add "+ ADAPTORID + " = "+adaptor+" UDPAdaptor raw
"+PORTNO+ " 0");
+    assertEquals(name, ADAPTORID);
+    Thread.sleep(500);
+    send.send(p);
+    
+    for(int i=0; i< 5; ++i) {
+      Chunk c = chunks.waitForAChunk(5000);
+      System.out.println("received " + i);
+      assertNotNull(c);
+      String dat = new String(c.getData());
+      assertTrue(dat.equals(STR));
+      assertTrue(c.getDataType().equals("raw"));
+      assertEquals(c.getSeqID(), STR.length());
+      
+      agent.stopAdaptor(name, AdaptorShutdownPolicy.RESTARTING);
+      name =agent.processAddCommand("add "+ADAPTORID + " = "+adaptor+" UDPAdaptor raw "+PORTNO
+ " 0");
+      assertEquals(name, ADAPTORID);
+    }
+    Chunk c = chunks.waitForAChunk(5000);
+
+    Thread.sleep(500);
+    
+    buf = "different data".getBytes();
+    p = new DatagramPacket(buf, buf.length);   
+    p.setSocketAddress(new InetSocketAddress("127.0.0.1",PORTNO));
+    send.send(p);
+    c = chunks.waitForAChunk(5000);
+    assertNotNull(c);
+    assertEquals(buf.length + STR.length(), c.getSeqID());
+    
+    agent.stopAdaptor(name, true);
+    assertEquals(0, agent.adaptorCount());
+    Thread.sleep(500);//before re-binding
+    agent.shutdown();
+  }
+
+}



Mime
View raw message