chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r788235 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache...
Date Thu, 25 Jun 2009 01:29:16 GMT
Author: eyang
Date: Thu Jun 25 01:29:15 2009
New Revision: 788235

URL: http://svn.apache.org/viewvc?rev=788235&view=rev
Log:
CHUKWA-330. Make separation between fixed adaptor parameters and optional adaptor parameters
for generating adaptor id. (Ari Rabkin via Eric Yang)

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/AdaptorNamingUtils.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Jun 25 01:29:15 2009
@@ -76,6 +76,8 @@
 
   BUG FIXES
 
+    CHUKWA-330. Make separation between fixed adaptor parameters and optional adaptor parameters
for generating adaptor id. (Ari Rabkin via Eric Yang)
+
     CHUKWA-37. Remove stale and unused bin scripts. (asrabkin)
 
     CHUKWA-331. Fixed regular expression for down sampling base on record type. (Cheng Zhang
via Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
Thu Jun 25 01:29:15 2009
@@ -34,16 +34,16 @@
   }
 
   @Override
-  public final void start(String adaptorID, String type, String status, long offset,
+  public final void start(String adaptorID, String type, long offset,
       ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
     this.adaptorID = adaptorID;
     this.type = type;
     this.dest=dest;
     control = c;
-    start(status, offset);
+    start(offset);
   }
   
-  public abstract void start(String status, long offset) throws AdaptorException;
+  public abstract void start(long offset) throws AdaptorException;
 
   public void deregisterAndStop(boolean gracefully) {
     control.stopAdaptor(adaptorID, gracefully);

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
Thu Jun 25 01:29:15 2009
@@ -47,7 +47,7 @@
    * @param offset the stream offset of the first byte sent by this adaptor
    * @throws AdaptorException
    */
-  public void start(String adaptorID, String type, String status, long offset,
+  public void start(String adaptorID, String type, long offset,
       ChunkReceiver dest, AdaptorManager c) throws AdaptorException;
 
   /**
@@ -62,11 +62,15 @@
   public String getType();
 
   /**
-   * Return the stream name
+   * Parse args, return stream name.  Do not start running.
    * 
-   * @return Stream name as a string
+   * Return the stream name, given params.
+   * The stream name is the part of the Adaptor status that's used to 
+   * determine uniqueness. 
+   * 
+   * @return Stream name as a string, null if params are malformed
    */
-  public String getStreamName();
+  public String parseArgs(String params);
 
   /**
    * Signals this adaptor to come to an orderly stop. The adaptor ought to push

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
Thu Jun 25 01:29:15 2009
@@ -48,16 +48,9 @@
 
   static Pattern cmd = Pattern.compile("(.+)\\s+(\\S+)");
   @Override
-  public void start(String status, long offset) throws AdaptorException {
+  public void start(long offset) throws AdaptorException {
     scanInterval = control.getConfiguration().getInt("adaptor.dirscan.intervalMs", 10000);
-    Matcher m = cmd.matcher(status);
-    if(!m.matches() )
-      throw new AdaptorException("bad syntax for DirTailer");
-    else if (m.groupCount() < 2)
-      throw new AdaptorException("bad syntax for DirTailer");
-    baseDir = new File(m.group(1));
-    adaptorName = m.group(2);
-    
+      
     scanThread.start();
     lastSweepStartTime = offset;
   }
@@ -106,8 +99,16 @@
   }
 
   @Override
-  public String getStreamName() {
-    return "dir scan of " + baseDir;
+  public String parseArgs(String status) {
+    Matcher m = cmd.matcher(status);
+    if(!m.matches() ) {
+      log.warn("bad syntax in DirTailingAdaptor args");
+      return null;
+    }
+    baseDir = new File(m.group(1));
+    adaptorName = m.group(2);
+    return baseDir + " " + adaptorName; //both params mandatory
+
   }
 
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
Thu Jun 25 01:29:15 2009
@@ -62,11 +62,13 @@
   
   class RunToolTask extends TimerTask {
     public void run() {
+      log.info("calling exec");
       JSONObject o = exec.execute();
       try {
 
         if (o.getInt("status") == exec.statusKO)
-          hardStop();
+          deregisterAndStop(false);
+
 
         // FIXME: downstream customers would like timestamps here.
         // Doing that efficiently probably means cutting out all the
@@ -78,6 +80,8 @@
           result.append(dateFormat.format(new java.util.Date()));
           result.append(" INFO org.apache.hadoop.chukwa.");
           result.append(type);
+          result.append("= ");
+          result.append(o.getString("exitValue"));
           result.append(": ");
           result.append(o.getString("stdout"));
           data = result.toString().getBytes();
@@ -101,12 +105,10 @@
 
         dest.add(c);
       } catch (JSONException e) {
-        // FIXME: log this somewhere
+        log.warn(e);
       } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-      } catch (AdaptorException e) {
-        // FIXME: log this somewhere
-      }
+        ;
+      } 
     }
   };
 
@@ -124,9 +126,6 @@
     return type + " " + period + " " + cmd + " " + sendOffset;
   }
 
-  public String getStreamName() {
-    return cmd;
-  }
 
   @Override
   @Deprecated
@@ -156,12 +155,6 @@
        exec.stop();
        break;
      case GRACEFULLY :
-       try {
-         timer.cancel();
-         exec.waitFor();
-       } catch (InterruptedException e) {
-       }
-       break;
      case WAIT_TILL_FINISHED :
        try {
          timer.cancel();
@@ -175,8 +168,19 @@
   }
 
   @Override
-  public void start(String status, long offset) throws AdaptorException {
+  public void start(long offset) throws AdaptorException {
+
+
+    this.sendOffset = offset;
+
+    this.exec = new EmbeddedExec(cmd);
+    TimerTask execTimer = new RunToolTask();
+    timer.schedule(execTimer, 0L, period);
+  }
 
+
+  @Override
+  public String parseArgs(String status) { 
     int spOffset = status.indexOf(' ');
     if (spOffset > 0) {
       try {
@@ -189,13 +193,9 @@
       }
     } else
       cmd = status;
-    this.sendOffset = offset;
-
-    this.exec = new EmbeddedExec(cmd);
-    TimerTask execTimer = new RunToolTask();
-    timer.schedule(execTimer, 0L, period);
+    
+    return cmd;
   }
 
 
-
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
Thu Jun 25 01:29:15 2009
@@ -81,7 +81,7 @@
         
         long startTime = System.currentTimeMillis();
         for (FileAdaptor adaptor: adaptors) {
-          log.info("calling this adaptor:" + adaptor.getStreamName());
+          log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath());
           adaptor.sendFile(); 
         }
         
@@ -136,7 +136,7 @@
   private long timeOut = 0;
   
   protected volatile boolean finished = false;
-  protected File toWatch;
+  File toWatch;
   protected RandomAccessFile reader = null;
   protected long fileReadOffset;
   protected boolean shutdownCalled = false;
@@ -146,21 +146,14 @@
    */
   private long offsetOfFirstByte = 0;
 
-  public void start(String params, long bytes) {
+  public void start( long bytes) {
     // in this case params = filename
     log.info("adaptor id: " + adaptorID + " started file adaptor on file "
-        + params);
+        + toWatch);
     this.startTime = System.currentTimeMillis();
     this.timeOut = startTime + TIMEOUT_PERIOD;
     
 
-    String[] words = params.split(" ");
-    if (words.length > 1) {
-      offsetOfFirstByte = Long.parseLong(words[0]);
-      toWatch = new File(params.substring(words[0].length() + 1));
-    } else {
-      toWatch = new File(params);
-    }
     
     tailer.addFileAdaptor(this);
   }
@@ -283,8 +276,16 @@
     return fileReadOffset + offsetOfFirstByte;
   }
   
-  public String getStreamName() {
-    return toWatch.getPath();
+  public String parseArgs(String params) {
+
+    String[] words = params.split(" ");
+    if (words.length > 1) {
+      offsetOfFirstByte = Long.parseLong(words[0]);
+      toWatch = new File(params.substring(words[0].length() + 1));
+    } else {
+      toWatch = new File(params);
+    }
+    return toWatch.getAbsolutePath();
   }
 
   /**

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
Thu Jun 25 01:29:15 2009
@@ -74,10 +74,19 @@
     log = Logger.getLogger(FileTailingAdaptor.class);
   }
 
-  public void start(String params, long bytes) {
-    // in this case params = filename
+  public void start(long bytes) {
     this.attempts = 0;
 
+   
+    log.info("started file tailer on file " + toWatch
+        + " with first byte at offset " + offsetOfFirstByte);
+
+    this.fileReadOffset = bytes;
+    tailer.startWatchingFile(this);
+  }
+  
+  @Override
+  public String parseArgs(String params) { 
     Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
     Matcher m = cmd.matcher(params);
     if (m.matches()) {
@@ -86,11 +95,7 @@
     } else {
       toWatch = new File(params.trim());
     }
-    log.info("started file tailer on file " + toWatch
-        + " with first byte at offset " + offsetOfFirstByte);
-
-    this.fileReadOffset = bytes;
-    tailer.startWatchingFile(this);
+    return toWatch.getAbsolutePath();
   }
 
   /**

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
Thu Jun 25 01:29:15 2009
@@ -44,6 +44,7 @@
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
+import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -315,31 +316,36 @@
       String params = m.group(4);
       if (params == null)
         params = "";
-
-      if(adaptorID == null)
-        adaptorID = synthesizeAdaptorID(adaptorClassName, dataType, params);
       
       Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorClassName);
       if (adaptor == null) {
         log.warn("Error creating adaptor of class " + adaptorClassName);
         return null;
       }
-
+      String coreParams = adaptor.parseArgs(params);
+      if(coreParams == null) {
+        log.warn("invalid params for adaptor: " + params);
+        return null;
+      }
+      
+      if(adaptorID == null) { //user didn't specify, so synthesize
+        try {
+         adaptorID = AdaptorNamingUtils.synthesizeAdaptorID(adaptorClassName, dataType, coreParams);
+        } catch(NoSuchAlgorithmException e) {
+          log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
+          shutdown(true);
+        }
+      }
+      
       synchronized (adaptorsByName) {
         
-        /*for (Map.Entry<Long, Adaptor> a : adaptorsByName.entrySet()) {
-          if (params.indexOf(a.getValue().getStreamName())!=-1) {
-            log.warn(params + " already exist, skipping.");
-            return null;
-          }
-        }*/
         if(adaptorsByName.containsKey(adaptorID))
           return adaptorID;
         adaptorsByName.put(adaptorID, adaptor);
         adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
         needNewCheckpoint = true;
         try {
-          adaptor.start(adaptorID, dataType, params, offset, DataFactory
+          adaptor.start(adaptorID, dataType, offset, DataFactory
               .getInstance().getEventQueue(), this);
           log.info("started a new adaptor, id = " + adaptorID);
           ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
@@ -358,29 +364,7 @@
     return null;
   }
 
-  private String synthesizeAdaptorID(String adaptorClassName, String dataType,
-      String params) {
-    MessageDigest md;
-    try {
-      md = MessageDigest.getInstance("MD5");
 
-      md.update(adaptorClassName.getBytes());
-      md.update(dataType.getBytes());
-      md.update(params.getBytes());
-      StringBuilder sb = new StringBuilder();
-      byte[] bytes = md.digest();
-      for(int i=0; i < bytes.length; ++i) {
-        if( (bytes[i] & 0xF0) == 0)
-          sb.append('0');
-        sb.append( Integer.toHexString(0xFF & bytes[i]) );
-      }
-      return sb.toString();
-    } catch (NoSuchAlgorithmException e) {
-      log.fatal("MD5 apparently doesn't work on your machine; bailing", e);
-      shutdown(true);//abort agent
-    }
-    return null;
-  }
 
   /**
    * Tries to restore from a checkpoint file in checkpointDir. There should

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
Thu Jun 25 01:29:15 2009
@@ -71,7 +71,7 @@
       try {
         log.info("Trying to resend the add command [" + adaptorName + "]["
             + offset + "][" + params + "] [" + numRetries + "]");
-        add(adaptorName, type, params, offset, numRetries, retryInterval);
+        addByName(null, adaptorName, type, params, offset, numRetries, retryInterval);
       } catch (Exception e) {
         log.warn("Exception in AddAdaptorTask.run", e);
         e.printStackTrace();
@@ -129,7 +129,10 @@
       }
       PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
           .getOutputStream()));
-      bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
+      if(id != null)
+        bw.println("ADD " + id + " = " + className + " " + appType + " " + params + " " +
offset);
+      else
+        bw.println("ADD " + className + " " + appType + " " + params + " " + offset);
       bw.flush();
       BufferedReader br = new BufferedReader(new InputStreamReader(s
           .getInputStream()));
@@ -231,7 +234,7 @@
    * @return the id number of the adaptor, generated by the agent
    */
   public String add(String adaptorName, String type, String params, long offset) {
-    return add(adaptorName, type, params, offset, 20, 15 * 1000);// retry for
+    return addByName(null, adaptorName, type, params, offset, 20, 15 * 1000);// retry for
                                                                  // five
                                                                  // minutes,
                                                                  // every
@@ -246,11 +249,11 @@
    * 
    * @return the id number of the adaptor, generated by the agent
    */
-  public String add(String adaptorName, String type, String params, long offset,
+  public String addByName(String adaptorID, String adaptorName, String type, String params,
long offset,
       long numRetries, long retryInterval) {
     ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
         adaptorName, type, params, offset);
-    String adaptorID = null;
+    adaptor.id = adaptorID;
     if (numRetries >= 0) {
       try {
         adaptorID = adaptor.register();
@@ -417,7 +420,7 @@
       }
     }
     if (!isDuplicate) {
-      return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
+      return addByName(null, DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
           numRetries, retryInterval);
     } else {
       System.out.println("An adaptor for filename \"" + filename

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
Thu Jun 25 01:29:15 2009
@@ -22,8 +22,10 @@
 import java.util.TimeZone;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.controller.ClientFinalizer;
+import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
 import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
@@ -541,8 +543,10 @@
               numRetries = 48;
             }
 
-
-            String adaptorID = chukwaClient.add(ChukwaAgentController.CharFileTailUTF8NewLineEscaped,
+            String name = AdaptorNamingUtils.synthesizeAdaptorID
+              (ChukwaAgentController.CharFileTailUTF8NewLineEscaped, recordType, log4jFileName);
+            
+            String adaptorID = chukwaClient.addByName(name, ChukwaAgentController.CharFileTailUTF8NewLineEscaped,
                 recordType,currentLength + " " + log4jFileName, currentLength,
                 numRetries, retryInterval);
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
(original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
Thu Jun 25 01:29:15 2009
@@ -54,8 +54,8 @@
     File file = new File(logFile);
     connector.start();
     Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
-    adaptor.start("", recordType, "0 " +file.getAbsolutePath(),
-        0l,queue, AdaptorManager.NULL );
+    adaptor.parseArgs( "0 " +file.getAbsolutePath());
+    adaptor.start("", recordType,  0l,queue, AdaptorManager.NULL );
     adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
     connector.shutdown();
     file.renameTo(new File(logFile + ".sav"));

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/AdaptorNamingUtils.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/AdaptorNamingUtils.java?rev=788235&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/AdaptorNamingUtils.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/AdaptorNamingUtils.java Thu
Jun 25 01:29:15 2009
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+public class AdaptorNamingUtils {
+  
+  public static String synthesizeAdaptorID(String adaptorClassName, String dataType,
+      String params) throws NoSuchAlgorithmException {
+    MessageDigest md;
+   md = MessageDigest.getInstance("MD5");
+
+    md.update(adaptorClassName.getBytes());
+    md.update(dataType.getBytes());
+    md.update(params.getBytes());
+    StringBuilder sb = new StringBuilder();
+    byte[] bytes = md.digest();
+    for(int i=0; i < bytes.length; ++i) {
+      if( (bytes[i] & 0xF0) == 0)
+        sb.append('0');
+      sb.append( Integer.toHexString(0xFF & bytes[i]) );
+    }
+    return sb.toString();
+   
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Thu Jun
25 01:29:15 2009
@@ -43,14 +43,9 @@
     return type.trim() + " " + bytesPerSec + " " + offset;
   }
 
-  public void start(String adaptorID, String type, String bytesPerSecParam,
+  public void start(String adaptorID, String type, 
       long offset, ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
-    try {
-      bytesPerSec = Integer.parseInt(bytesPerSecParam.trim());
-    } catch (NumberFormatException e) {
-      throw new AdaptorException("bad argument to const rate adaptor: ["
-          + bytesPerSecParam + "]");
-    }
+
     this.adaptorID = adaptorID;
     this.offset = offset;
     this.type = type;
@@ -59,7 +54,13 @@
     super.start(); // this is a Thread.start
   }
 
-  public String getStreamName() {
+  public String parseArgs(String bytesPerSecParam) {
+    try {
+      bytesPerSec = Integer.parseInt(bytesPerSecParam.trim());
+    } catch (NumberFormatException e) {
+      //("bad argument to const rate adaptor: ["  + bytesPerSecParam + "]");
+      return null;
+    }
     return this.type;
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Thu Jun
25 01:29:15 2009
@@ -40,7 +40,7 @@
     return "";
   }
 
-  public void start(String adaptorID, String type, String status, long offset,
+  public void start(String adaptorID, String type, long offset,
       ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
     this.setName("MaxRateSender adaptor");
     this.adaptorID = adaptorID;
@@ -50,8 +50,9 @@
     super.start(); // this is a Thread.start
   }
 
-  public String getStreamName() {
-    return type;
+  @Override
+  public String parseArgs(String s) {
+    return s;
   }
 
   public void run() {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
Thu Jun 25 01:29:15 2009
@@ -34,9 +34,9 @@
   }
 
   @Override
-  public String getStreamName() {
-    // TODO Auto-generated method stub
-    return "";
+  public String parseArgs(String s) {
+    params = s;
+    return s;
   }
 
   @Override
@@ -52,8 +52,7 @@
   }
 
   @Override
-  public void start(String params, long offset) throws AdaptorException {
-    this.params = params;
+  public void start(long offset) throws AdaptorException {
     this.startOffset = offset;
     System.out.println("adaptorId [" + adaptorID + "]");
     System.out.println("type [" + type + "]");

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
Thu Jun 25 01:29:15 2009
@@ -84,7 +84,7 @@
   
   public void testRepeatedlyOnBigFile() throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
-    int tests = 1000; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
+    int tests = 10; //SHOULD SET HIGHER AND WATCH WITH lsof to find leaks
 
     ChukwaAgent agent = new ChukwaAgent(conf);
     for(int i=0; i < tests; ++i) {
@@ -119,6 +119,20 @@
     pw.close();
     return tmpOutput;
   }
+  
+  public void testOffsetInAdaptorName() throws IOException, ChukwaAgent.AlreadyRunningException,
+  InterruptedException{
+    File testFile = makeTestFile("foo", 120);
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    assertEquals(0, agent.adaptorCount());
+    agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath()
+ " 0");
+    assertEquals(1, agent.adaptorCount());
+    Thread.sleep(2000);
+    agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath()
+ " 0");
+    assertEquals(1, agent.adaptorCount());
+    chunks.clear();
+    agent.shutdown();
+  }
 
   public static void main(String[] args) {
     try {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java?rev=788235&r1=788234&r2=788235&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
(original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/agent/TestAgentConfig.java
Thu Jun 25 01:29:15 2009
@@ -46,7 +46,7 @@
       assertEquals(1, agent.adaptorCount());// check that we processed initial
                                             // adaptors
       assertNotNull(agent.getAdaptor("testAdaptor"));
-      assertTrue(agent.getAdaptor("testAdaptor").getStreamName().contains("foo"));
+      assertTrue(agent.getAdaptor("testAdaptor").getCurrentStatus().contains("foo"));
 
       System.out
           .println("---------------------done with first run, now stopping");
@@ -70,7 +70,7 @@
       assertEquals(1, agent.adaptorCount());// check that we processed initial
                                             // adaptors
       assertNotNull(agent.getAdaptor("testAdaptor"));
-      assertTrue(agent.getAdaptor("testAdaptor").getStreamName().contains("foo"));
+      assertTrue(agent.getAdaptor("testAdaptor").getCurrentStatus().contains("foo"));
       agent.shutdown();
       Thread.sleep(2000);
       System.out.println("---------------------done");



Mime
View raw message