chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r900859 - in /hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection: adaptor/ adaptor/filetailer/ agent/
Date Tue, 19 Jan 2010 17:34:38 GMT
Author: asrabkin
Date: Tue Jan 19 17:34:37 2010
New Revision: 900859

URL: http://svn.apache.org/viewvc?rev=900859&view=rev
Log:
CHUKWA-420. Add reset method

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=900859&r1=900858&r2=900859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
Tue Jan 19 17:34:37 2010
@@ -31,7 +31,7 @@
   String innerType;
   ChunkReceiver dest;
   AdaptorManager manager;
-
+  String adaptorID;
   @Override
   public String getCurrentStatus() {
     return innerClassName + " " + inner.getCurrentStatus();
@@ -77,6 +77,7 @@
       ChunkReceiver dest) throws AdaptorException {
     String dummyAdaptorID = adaptorID;
     this.dest = dest;
+    this.adaptorID = adaptorID;
     inner.start(dummyAdaptorID, type, offset, this);
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java?rev=900859&r1=900858&r2=900859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorShutdownPolicy.java
Tue Jan 19 17:34:37 2010
@@ -18,7 +18,7 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
 public enum AdaptorShutdownPolicy {
-  HARD_STOP,GRACEFULLY,WAIT_TILL_FINISHED;
+  HARD_STOP,GRACEFULLY,WAIT_TILL_FINISHED,RESTARTING;
   
   public String toString() {
     if(this.equals(GRACEFULLY))
@@ -27,6 +27,8 @@
       return "Abruptly";
     else if(this.equals(WAIT_TILL_FINISHED))
       return "Once finished";
+    else if(this.equals(RESTARTING))
+      return "Prepare to restart";
     else
         return "unknown mode";
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=900859&r1=900858&r2=900859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
Tue Jan 19 17:34:37 2010
@@ -214,9 +214,6 @@
   public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
     log.info("Enter Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
     switch(shutdownPolicy) {
-      case HARD_STOP :
-        cleanUp();
-        break;
       case GRACEFULLY : {
         int retry = 0;
         while (!finished && retry < 60) {
@@ -245,6 +242,9 @@
       }
 
       break;
+      default :
+        cleanUp();
+        break;
     }
     log.info("Exist Shutdown:" + shutdownPolicy.name()+ " - ObjectId:" + this);
     return fileReadOffset + offsetOfFirstByte;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java?rev=900859&r1=900858&r2=900859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/UDPAdaptor.java
Tue Jan 19 17:34:37 2010
@@ -35,18 +35,21 @@
   
   class ListenThread extends Thread {
     public void run() {
+      log.info("UDP adaptor " + adaptorID + " started on port " + portno + " offset =" +
bytesReceived);
       byte[] buf = new byte[1024];
       DatagramPacket dp = new DatagramPacket(buf, buf.length);
       try {
         while(running) {
           ds.receive(dp);
+          log.info("got a UDP message");
           byte[] trimmedBuf =  Arrays.copyOf(buf, dp.getLength());
           bytesReceived += trimmedBuf.length;
           Chunk c = new ChunkImpl(type, source, bytesReceived, trimmedBuf, UDPAdaptor.this);
           dest.add(c);
         }
       } catch(Exception e) {
-        log.error("can't read UDP messages in " + adaptorID, e);
+        if(running)
+          log.error("can't read UDP messages in " + adaptorID, e);
       }
     }
   }
@@ -82,6 +85,7 @@
       throws AdaptorException {
     try {
       running = false;
+      ds.close();
       if(shutdownPolicy == AdaptorShutdownPolicy.GRACEFULLY)
         lt.join();
     } catch(InterruptedException e) {}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=900859&r1=900858&r2=900859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
Tue Jan 19 17:34:37 2010
@@ -99,7 +99,7 @@
       }
       break;
       
-      case HARD_STOP :
+      case HARD_STOP:
       default:
         tailer.stopWatchingFile(this);
         try {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java?rev=900859&r1=900858&r2=900859&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
Tue Jan 19 17:34:37 2010
@@ -21,6 +21,7 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
 import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
@@ -93,7 +94,7 @@
       status.remove(a); //it'll get added again when adaptor resumes, if it does
       ChukwaAgent.Offset off = agent.offset(a);
       if(off != null) {
-        agent.stopAdaptor(off.id, false);
+        agent.stopAdaptor(off.id, AdaptorShutdownPolicy.RESTARTING);
         
         String a_status = a.getCurrentStatus();
         agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()



Mime
View raw message