hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r742797 - in /hadoop/core/trunk: ./ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ src/contrib/chukwa/src/java/org/apache...
Date Tue, 10 Feb 2009 00:15:35 GMT
Author: cdouglas
Date: Tue Feb 10 00:15:34 2009
New Revision: 742797

URL: http://svn.apache.org/viewvc?rev=742797&view=rev
Log:
HADOOP-5018. Add pipelined writers to Chukwa. Contributed by Ari Rabkin

Added:
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
    hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/
    hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
    hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Feb 10 00:15:34 2009
@@ -38,6 +38,8 @@
     HADOOP-3741. Add a web ui to the SecondaryNameNode for showing its status.
     (szetszwo)
 
+    HADOOP-5018. Add pipelined writers to Chukwa. (Ari Rabkin via cdouglas)
+
   IMPROVEMENTS
 
     HADOOP-4565. Added CombineFileInputFormat to use data locality information

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
Tue Feb 10 00:15:34 2009
@@ -48,27 +48,26 @@
         portNum = Integer.parseInt(args[0]);
       
         //pick a writer.
+      ChukwaWriter w = null;
       if(args.length > 1) {
         if(args[1].equals("pretend"))
-          ServletCollector.setWriter(new ConsoleWriter(true));
+          w= new ConsoleWriter(true);
         else if(args[1].equals("pretend-quietly"))
-          ServletCollector.setWriter(new ConsoleWriter(false));
+          w = new ConsoleWriter(false);
         else if(args[1].equals("-classname")) {
           if(args.length < 3)
             System.err.println("need to specify a writer class");
           else {
-            Class<?> writerClass = Class.forName(args[2]);
-            if(writerClass != null &&
-                ChukwaWriter.class.isAssignableFrom(writerClass))
-              ServletCollector.setWriter(
-                  (ChukwaWriter) writerClass.newInstance());
-            else
-              System.err.println(args[2]+ "is not a ChukwaWriter");
+            conf.set("chukwaCollector.writerClass", args[2]);
           }
         }
         else
           System.out.println("WARNING: unknown command line arg "+ args[1]);
       }
+      if(w != null) {
+        w.init(conf);
+        ServletCollector.setWriter(w);
+      }
       
         //set up jetty connector
       SelectChannelConnector jettyConnector = new SelectChannelConnector();
@@ -85,7 +84,7 @@
       jettyServer.setThreadPool(pool);
         //and add the servlet to it
       Context root = new Context(jettyServer,"/",Context.SESSIONS);
-      root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+      root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
       jettyServer.start();
       jettyServer.setStopAtShutdown(false);
      

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
Tue Feb 10 00:15:34 2009
@@ -21,10 +21,7 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -33,10 +30,11 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
-import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
 import org.apache.log4j.Logger;
 
 public class ServletCollector extends HttpServlet
@@ -51,12 +49,18 @@
 	public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter
w) throws WriterException
 	{
 	  writer = w;
-	  w.init();
 	}
 	static long statTime = 0L;
 	static int numberHTTPConnection = 0;
 	static int numberchunks = 0;
 	
+	Configuration conf;
+  
+  public ServletCollector(Configuration c) {
+    conf =c;
+  }
+
+	
 	public void init(ServletConfig servletConf) throws ServletException
 	{
 	  
@@ -66,7 +70,6 @@
 			return;
 		}
 		
-		
 		Timer statTimer = new Timer();
 		statTimer.schedule(new TimerTask()
 		{
@@ -80,31 +83,26 @@
 			}
 		}, (1000), (60*1000));
 		
-		try
-		{
-			// read the application->pipeline settings from a config file in the format:
-			// appliation_name: PipelinedWriter1, PipelinedWriter2, Writer
-			// use reflection to set up the pipeline after reading in the list of writers from the
config file
-			
-			/*
-			String strPipelines = "HadoopLogs:HdfsWriter\nApplication2:SameerWriter:HdfsWriter";
-			String[] pipelines = strPipelines.split("\n");
-			// split into pipes
-			for (String pipe : pipelines){
-				String[] tmp = pipe.split(":");
-				String app = tmp[0];
-				String[] stages = tmp[1].split(",");
-			
-				//loop through pipes, creating linked list of stages per pipe, one at a time 
-				for (String stage : stages){
-					Class curr = ClassLoader.loadClass(stage);
-				}
-			}
-			*/
-		      //FIXME: seems weird to initialize a static object here
-			if (writer == null)
-				writer =  new SeqFileWriter();
-
+		if(writer != null) {
+		  log.info("writer set up statically, no need for Collector.init() to do it");
+		  return;
+		}
+		
+		try {
+	   String writerClassName = conf.get("chukwaCollector.writerClass", 
+	          SeqFileWriter.class.getCanonicalName());
+	    Class<?> writerClass = Class.forName(writerClassName);
+	    if(writerClass != null &&ChukwaWriter.class.isAssignableFrom(writerClass))
+	        writer = (ChukwaWriter) writerClass.newInstance();
+		} catch(Exception e) {
+		  log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
+		}
+      
+    //We default to here if the pipeline construction failed or didn't happen.
+    try{ 
+      if(writer == null)
+        writer =  new SeqFileWriter();//default to SeqFileWriter
+      writer.init(conf);
 		} catch (WriterException e) {
 			throw new ServletException("Problem init-ing servlet", e);
 		}		

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
Tue Feb 10 00:15:34 2009
@@ -73,7 +73,7 @@
       Context root = new Context(server,"/",Context.SESSIONS);
   
       ServletCollector.setWriter(new ConsoleWriter(true));
-      root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+      root.addServlet(new ServletHolder(new ServletCollector(new ChukwaConfiguration(true))),
"/*");
       server.start();
       server.setStopAtShutdown(false);
   

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
Tue Feb 10 00:15:34 2009
@@ -21,10 +21,11 @@
 import java.util.List;
 
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
 
 public interface ChukwaWriter
 {
-	public void init() throws WriterException;
+	public void init(Configuration c) throws WriterException;
 	public void add(Chunk data) throws WriterException;
 	public void add(List<Chunk> chunks) throws WriterException;
 	public void close() throws WriterException;;

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
Tue Feb 10 00:15:34 2009
@@ -23,6 +23,7 @@
 import java.util.TimerTask;
 
 import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
 
 public class ConsoleWriter implements ChukwaWriter {
 
@@ -48,6 +49,10 @@
     }
   };
   
+
+  public ConsoleWriter() {
+    this(true);
+  }
   
   public ConsoleWriter(boolean printData) {
     this.printData = printData;
@@ -59,7 +64,7 @@
     statTimer.cancel();
   }
 
-  public void init() throws WriterException
+  public void init(Configuration conf) throws WriterException
   {
      System.out.println("----  DUMMY HDFS WRITER IN USE ---");
 
@@ -73,7 +78,7 @@
     dataSize += data.getData().length;
     if(printData) {
       System.out.println(data.getData().length + " bytes of data in chunk");
-      
+
       for(int offset: data.getRecordOffsets()) {
         System.out.print(data.getStreamName());
         System.out.print(" ");

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java?rev=742797&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
(added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+
+public class Dedup implements PipelineableWriter {
+  
+  static final class DedupKey {
+    String name;
+    long val; //sequence number
+    
+    public DedupKey(String n, long p) {
+      name = n;
+      val = p;
+    }
+
+    public int hashCode() {
+      return (int) (name.hashCode() ^ val ^ (val >> 32));
+    }
+
+    public boolean equals(Object dk) {
+      if(dk instanceof DedupKey)
+        return name.equals(((DedupKey)dk).name) && val == ((DedupKey)dk).val;
+      else return false;
+    }
+  }
+  
+  static class FixedSizeCache<EntryType> {
+    final HashSet<EntryType> hs;
+    final Queue<EntryType> toDrop;
+    final int maxSize;
+    volatile long dupchunks =0;
+    public FixedSizeCache(int size) {
+      maxSize = size;
+      hs = new HashSet<EntryType>(maxSize);
+      toDrop = new ArrayDeque<EntryType>(maxSize);
+    }
+    
+    public synchronized void add(EntryType t) {
+      if(maxSize == 0)
+        return;
+      
+      if(hs.size() >= maxSize) 
+        while(hs.size() >= maxSize) {
+          EntryType td = toDrop.remove();
+          hs.remove(td);
+        }
+      
+      hs.add(t);
+      toDrop.add(t);
+    }
+    
+    private synchronized boolean addAndCheck(EntryType t) {
+      if(maxSize == 0)
+        return false;
+      
+      boolean b= hs.contains(t);
+      if(b)
+        dupchunks++;
+      else {
+        hs.add(t);
+        toDrop.add(t);
+      }
+      return b;
+    }
+    
+    private long dupCount() {
+      return dupchunks;
+    }
+  }
+  
+
+  FixedSizeCache<DedupKey> cache;
+  ChukwaWriter next;
+
+  @Override
+  public void setNextStage(ChukwaWriter next) {
+    this.next = next;
+  }
+
+  @Override
+  public void add(Chunk data) throws WriterException {
+    if(! cache.addAndCheck(new DedupKey(data.getStreamName(), data.getSeqID())))
+        next.add(data);
+  }
+
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    for(Chunk c: chunks)
+      add(c);
+
+  }
+
+  @Override
+  public void close() throws WriterException {
+    next.close();
+  }
+
+  @Override
+  public void init(Configuration c) throws WriterException {
+    int csize = c.getInt("chukwaCollector.chunkSuppressBufferSize", 0);
+    cache = new FixedSizeCache<DedupKey>(csize);
+  }
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
Tue Feb 10 00:15:34 2009
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
 
 public class InMemoryWriter implements ChukwaWriter {
 
@@ -31,7 +32,7 @@
     buf.reset();
   }
 
-  public void init() throws WriterException {
+  public void init(Configuration conf) throws WriterException {
     buf = new ByteArrayOutputStream();
   }
 

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
Tue Feb 10 00:15:34 2009
@@ -18,6 +18,84 @@
 
 package org.apache.hadoop.chukwa.datacollection.writer;
 
-public interface PipelineStageWriter extends ChukwaWriter{
-	public void setNextStage(ChukwaWriter next);
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class PipelineStageWriter implements ChukwaWriter {
+  Logger log = Logger.getLogger(PipelineStageWriter.class);
+
+  ChukwaWriter writer; //head of pipeline
+
+  @Override
+  public void add(Chunk data) throws WriterException {
+    writer.add(data);
+
+  }
+
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    writer.add(chunks);
+  }
+
+  @Override
+  public void close() throws WriterException {
+    writer.close();
+  }
+
+  @Override
+  public void init(Configuration conf) throws WriterException {
+    if (conf.get("chukwaCollector.pipeline") != null) {
+      String pipeline = conf.get("chukwaCollector.pipeline");
+      try {
+        String[] classes = pipeline.split(",");
+        ArrayList<PipelineableWriter> stages = new ArrayList<PipelineableWriter>();
+        
+        PipelineableWriter lastWriter= null;
+        if(classes.length > 1) {
+          lastWriter = (PipelineableWriter) conf.getClassByName(classes[0]).newInstance();
+          lastWriter.init(conf);
+          writer = lastWriter;
+        }
+        
+        for(int i = 1; i < classes.length -1; ++i) {
+          Class stageClass = conf.getClassByName(classes[i]);
+          Object st = stageClass.newInstance();
+          if(!(st instanceof PipelineableWriter))
+            log.error("class "+ classes[i]+ " in processing pipeline isn't a pipeline stage");
+              
+          PipelineableWriter stage =  (PipelineableWriter) stageClass.newInstance();
+          stage.init(conf);
+          //throws exception if types don't match or class not found; this is OK.
+          
+          lastWriter.setNextStage(stage);
+          lastWriter = stage;
+        }
+        Class stageClass = conf.getClassByName(classes[classes.length-1]);
+        Object st = stageClass.newInstance();
+        
+        if(!(st instanceof ChukwaWriter)) {
+          log.error("class "+ classes[classes.length-1]+ " at end of processing pipeline
isn't a ChukwaWriter");
+          throw new WriterException("bad pipeline");
+        } else {
+          if(lastWriter != null)
+            lastWriter.setNextStage((ChukwaWriter) st);
+          else
+            writer = (ChukwaWriter) st; //one stage pipeline
+        }
+        return; 
+      } catch(Exception e) {
+        //if anything went wrong (missing class, etc) we wind up here.
+          log.error("failed to set up pipeline, defaulting to SeqFileWriter",e);
+          //fall through to default case
+          throw new WriterException("bad pipeline");
+      }
+    } else {
+      throw new WriterException("must set chukwaCollector.pipeline");
+    }
+  }
+
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java?rev=742797&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
(added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+public interface PipelineableWriter extends ChukwaWriter{
+	public void setNextStage(ChukwaWriter next);
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Tue Feb 10 00:15:34 2009
@@ -29,7 +29,7 @@
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,7 +51,7 @@
     static Logger log = Logger.getLogger(SeqFileWriter.class);
   
 	private FileSystem fs = null;
-	private ChukwaConfiguration conf = null;
+	private Configuration conf = null;
 
 	private String outputDir = null;
 	private Calendar calendar = Calendar.getInstance();
@@ -78,11 +78,9 @@
 	
 	public SeqFileWriter() throws WriterException
 	{
-		conf = new ChukwaConfiguration(true);
-		init();
 	}
 
-	public void init() throws WriterException
+	public void init(Configuration conf) throws WriterException
 	{
 		outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
 

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java?rev=742797&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
(added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.conf.Configuration;
+
+public class CaptureWriter implements ChukwaWriter {
+  static ArrayList<Chunk> outputs = new ArrayList<Chunk>();
+
+  @Override
+  public void add(Chunk data) throws WriterException {
+    synchronized(outputs) {
+      outputs.add(data);
+    }
+    
+  }
+
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+     for(Chunk c: chunks)
+       add(c);
+  }
+
+  @Override
+  public void close() throws WriterException { }
+
+  @Override
+  public void init(Configuration c) throws WriterException {  }
+  
+}
+

Added: hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java?rev=742797&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
(added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.*;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.sender.*;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
+
+import java.util.*;
+
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class CollectorTest extends TestCase {
+  
+  
+  public void testCollector() {
+    try {
+      Configuration conf = new Configuration();
+      conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
+      conf.set("chukwaCollector.pipeline", 
+          "org.apache.hadoop.chukwa.datacollection.writer.Dedup,"//note comma
+           + "org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter");
+      conf.set("chukwaCollector.writerClass", PipelineStageWriter.class.getCanonicalName());
+      ChukwaHttpSender sender = new ChukwaHttpSender(conf);
+      ArrayList<String> collectorList = new ArrayList<String>();
+      collectorList.add("http://localhost:9990/chukwa");
+      sender.setCollectors(new RetryListOfCollectors(collectorList, 50));
+      Server server = new Server(9990);
+      Context root = new Context(server,"/",Context.SESSIONS);
+  
+      root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
+      server.start();
+      server.setStopAtShutdown(false);
+      Thread.sleep(1000);
+      
+      Chunk c = new ChunkImpl("data", "stream", 0, "testing -- this should appear once".getBytes(),
null);
+      ArrayList<Chunk> toSend = new ArrayList<Chunk>();
+      toSend.add(c);
+      toSend.add(c);
+      sender.send(toSend);
+      Thread.sleep(1000);
+      assertEquals(1, CaptureWriter.outputs.size());
+    } catch(Exception e) {
+      fail(e.toString());
+    }
+    
+  }
+
+}



Mime
View raw message