hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r686893 - in /hadoop/core/trunk: ./ src/c++/pipes/api/hadoop/ src/c++/pipes/impl/ src/examples/pipes/impl/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/pipes/ src/test/org/apache/hadoop/mapred/pipes/
Date Mon, 18 Aug 2008 23:16:37 GMT
Author: omalley
Date: Mon Aug 18 16:16:36 2008
New Revision: 686893

URL: http://svn.apache.org/viewvc?rev=686893&view=rev
Log:
HADOOP-1480. Add counters to the C++ Pipes API. (acmurthy via omalley)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh
    hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc
    hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
    hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc
    hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 18 16:16:36 2008
@@ -92,6 +92,8 @@
     HADOOP-3585. FailMon package for hardware failure monitoring and 
     analysis of anomalies. (Ioannis Koltsidas via dhruba)
 
+    HADOOP-1480. Add counters to the C++ Pipes API. (acmurthy via omalley)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

Modified: hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/api/hadoop/Pipes.hh?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh (original)
+++ hadoop/core/trunk/src/c++/pipes/api/hadoop/Pipes.hh Mon Aug 18 16:16:36 2008
@@ -57,6 +57,19 @@
 class TaskContext {
 public:
   /**
+   * Counter to keep track of a property and its value.
+   */
+  class Counter {
+  private:
+    int id;
+  public:
+    Counter(int counterId) : id(counterId) {}
+    Counter(const Counter& counter) : id(counter.id) {}
+
+    int getId() const { return id; }
+  };
+  
+  /**
    * Get the JobConf for the current task.
    */
   virtual const JobConf* getJobConf() = 0;
@@ -89,6 +102,17 @@
    */
   virtual void setStatus(const std::string& status) = 0;
 
+  /**
+   * Register a counter with the given group and name.
+   */
+  virtual Counter* 
+    getCounter(const std::string& group, const std::string& name) = 0;
+
+  /**
+   * Increment the value of the counter with the given amount.
+   */
+  virtual void incrementCounter(const Counter* counter, uint64_t amount) = 0;
+  
   virtual ~TaskContext() {}
 };
 

Modified: hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ hadoop/core/trunk/src/c++/pipes/impl/HadoopPipes.cc Mon Aug 18 16:16:36 2008
@@ -99,6 +99,10 @@
     virtual void status(const string& message) = 0;
     virtual void progress(float progress) = 0;
     virtual void done() = 0;
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) = 0;
+    virtual void 
+      incrementCounter(const TaskContext::Counter* counter, uint64_t amount) = 0;
     virtual ~UpwardProtocol() {}
   };
 
@@ -150,6 +154,19 @@
               lineSeparator);
     }
 
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) {
+      fprintf(stream, "registerCounter%c%d%c%s%c%s%c", fieldSeparator, id,
+              fieldSeparator, group.c_str(), fieldSeparator, name.c_str(), 
+              lineSeparator);
+    }
+
+    virtual void incrementCounter(const TaskContext::Counter* counter, 
+                                  uint64_t amount) {
+      fprintf(stream, "incrCounter%c%d%c%ld%c", fieldSeparator, counter->getId(), 
+              fieldSeparator, (long)amount, lineSeparator);
+    }
+    
     virtual void done() {
       fprintf(stream, "done%c", lineSeparator);
     }
@@ -272,8 +289,9 @@
 
   enum MESSAGE_TYPE {START_MESSAGE, SET_JOB_CONF, SET_INPUT_TYPES, RUN_MAP, 
                      MAP_ITEM, RUN_REDUCE, REDUCE_KEY, REDUCE_VALUE, 
-                     CLOSE, ABORT,
-                     OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE};
+                     CLOSE, ABORT, 
+                     OUTPUT=50, PARTITIONED_OUTPUT, STATUS, PROGRESS, DONE,
+                     REGISTER_COUNTER, INCREMENT_COUNTER};
 
   class BinaryUpwardProtocol: public UpwardProtocol {
   private:
@@ -313,6 +331,21 @@
       serializeInt(DONE, *stream);
     }
 
+    virtual void registerCounter(int id, const string& group, 
+                                 const string& name) {
+      serializeInt(REGISTER_COUNTER, *stream);
+      serializeInt(id, *stream);
+      serializeString(group, *stream);
+      serializeString(name, *stream);
+    }
+
+    virtual void incrementCounter(const TaskContext::Counter* counter, 
+                                  uint64_t amount) {
+      serializeInt(INCREMENT_COUNTER, *stream);
+      serializeInt(counter->getId(), *stream);
+      serializeLong(amount, *stream);
+    }
+    
     ~BinaryUpwardProtocol() {
       delete stream;
     }
@@ -505,6 +538,14 @@
       return valueItr != endValueItr;
     }
     
+    virtual Counter* getCounter(const std::string& group, 
+                               const std::string& name) {
+      return baseContext->getCounter(group, name);
+    }
+
+    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
+      baseContext->incrementCounter(counter, amount);
+    }
   };
 
   /**
@@ -586,6 +627,7 @@
     int numReduces;
     const Factory* factory;
     pthread_mutex_t mutexDone;
+    std::vector<int> registeredCounterIds;
 
   public:
 
@@ -838,6 +880,24 @@
       }
     }
 
+    /**
+     * Register a counter with the given group and name.
+     */
+    virtual Counter* getCounter(const std::string& group, 
+                               const std::string& name) {
+      int id = registeredCounterIds.size();
+      registeredCounterIds.push_back(id);
+      uplink->registerCounter(id, group, name);
+      return new Counter(id);
+    }
+
+    /**
+     * Increment the value of the counter with the given amount.
+     */
+    virtual void incrementCounter(const Counter* counter, uint64_t amount) {
+      uplink->incrementCounter(counter, amount); 
+    }
+
     void closeAll() {
       if (reader) {
         reader->close();

Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-nopipe.cc Mon Aug 18 16:16:36 2008
@@ -24,27 +24,43 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
 class WordCountMap: public HadoopPipes::Mapper {
 public:
-  WordCountMap(HadoopPipes::MapContext& context){}
+  HadoopPipes::TaskContext::Counter* inputWords;
+  
+  WordCountMap(HadoopPipes::TaskContext& context) {
+    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+  }
+  
   void map(HadoopPipes::MapContext& context) {
     std::vector<std::string> words = 
       HadoopUtils::splitString(context.getInputValue(), " ");
     for(unsigned int i=0; i < words.size(); ++i) {
       context.emit(words[i], "1");
     }
+    context.incrementCounter(inputWords, words.size());
   }
 };
 
 class WordCountReduce: public HadoopPipes::Reducer {
 public:
-  WordCountReduce(HadoopPipes::ReduceContext& context){}
+  HadoopPipes::TaskContext::Counter* outputWords;
+
+  WordCountReduce(HadoopPipes::TaskContext& context) {
+    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+  }
+
   void reduce(HadoopPipes::ReduceContext& context) {
     int sum = 0;
     while (context.nextValue()) {
       sum += HadoopUtils::toInt(context.getInputValue());
     }
     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+    context.incrementCounter(outputWords, 1); 
   }
 };
 

Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-part.cc Mon Aug 18 16:16:36 2008
@@ -20,27 +20,43 @@
 #include "hadoop/TemplateFactory.hh"
 #include "hadoop/StringUtils.hh"
 
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
 class WordCountMap: public HadoopPipes::Mapper {
 public:
-  WordCountMap(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* inputWords;
+  
+  WordCountMap(HadoopPipes::TaskContext& context) {
+    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+  }
+  
   void map(HadoopPipes::MapContext& context) {
     std::vector<std::string> words = 
       HadoopUtils::splitString(context.getInputValue(), " ");
     for(unsigned int i=0; i < words.size(); ++i) {
       context.emit(words[i], "1");
     }
+    context.incrementCounter(inputWords, words.size());
   }
 };
 
 class WordCountReduce: public HadoopPipes::Reducer {
 public:
-  WordCountReduce(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* outputWords;
+
+  WordCountReduce(HadoopPipes::TaskContext& context) {
+    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+  }
+
   void reduce(HadoopPipes::ReduceContext& context) {
     int sum = 0;
     while (context.nextValue()) {
       sum += HadoopUtils::toInt(context.getInputValue());
     }
     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+    context.incrementCounter(outputWords, 1); 
   }
 };
 

Modified: hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc (original)
+++ hadoop/core/trunk/src/examples/pipes/impl/wordcount-simple.cc Mon Aug 18 16:16:36 2008
@@ -20,27 +20,43 @@
 #include "hadoop/TemplateFactory.hh"
 #include "hadoop/StringUtils.hh"
 
+const std::string WORDCOUNT = "WORDCOUNT";
+const std::string INPUT_WORDS = "INPUT_WORDS";
+const std::string OUTPUT_WORDS = "OUTPUT_WORDS";
+
 class WordCountMap: public HadoopPipes::Mapper {
 public:
-  WordCountMap(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* inputWords;
+  
+  WordCountMap(HadoopPipes::TaskContext& context) {
+    inputWords = context.getCounter(WORDCOUNT, INPUT_WORDS);
+  }
+  
   void map(HadoopPipes::MapContext& context) {
     std::vector<std::string> words = 
       HadoopUtils::splitString(context.getInputValue(), " ");
     for(unsigned int i=0; i < words.size(); ++i) {
       context.emit(words[i], "1");
     }
+    context.incrementCounter(inputWords, words.size());
   }
 };
 
 class WordCountReduce: public HadoopPipes::Reducer {
 public:
-  WordCountReduce(HadoopPipes::TaskContext& context){}
+  HadoopPipes::TaskContext::Counter* outputWords;
+
+  WordCountReduce(HadoopPipes::TaskContext& context) {
+    outputWords = context.getCounter(WORDCOUNT, OUTPUT_WORDS);
+  }
+
   void reduce(HadoopPipes::ReduceContext& context) {
     int sum = 0;
     while (context.nextValue()) {
       sum += HadoopUtils::toInt(context.getInputValue());
     }
     context.emit(context.getInputKey(), HadoopUtils::toString(sum));
+    context.incrementCounter(outputWords, 1); 
   }
 };
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Counters.java Mon Aug 18 16:16:36
2008
@@ -319,6 +319,16 @@
   }
 
   /**
+   * Find a counter given the group and the name.
+   * @param group the name of the group
+   * @param name the internal name of the counter
+   * @return the counter for that name
+   */
+  public synchronized Counter findCounter(String group, String name) {
+    return getGroup(group).getCounterForName(name);
+  }
+
+  /**
    * Find a counter by using strings
    * @param group the name of the group
    * @param id the id of the counter within the group (0 to N-1)

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Reporter.java Mon Aug 18 16:16:36
2008
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.Progressable;
 
 /** 
@@ -46,6 +47,9 @@
       }
       public void progress() {
       }
+      public Counter getCounter(String group, String name) {
+        return null;
+      }
       public void incrCounter(Enum key, long amount) {
       }
       public void incrCounter(String group, String counter, long amount) {
@@ -63,6 +67,15 @@
   public abstract void setStatus(String status);
   
   /**
+   * Get the {@link Counter} of the given group with the given name.
+   * 
+   * @param group counter group
+   * @param name counter name
+   * @return the <code>Counter</code> of the given group/name.
+   */
+  public abstract Counter getCounter(String group, String name);
+  
+  /**
    * Increments the counter identified by the key, which can be of
    * any {@link Enum} type, by the specified amount.
    * 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Aug 18 16:16:36 2008
@@ -48,6 +48,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -408,6 +409,13 @@
           // indicate that progress update needs to be sent
           setProgressFlag();
         }
+        public Counters.Counter getCounter(String group, String name) {
+          Counters.Counter counter = null;
+          if (counters != null) {
+            counter = counters.findCounter(group, name);
+          }
+          return counter;
+        }
         public void incrCounter(Enum key, long amount) {
           if (counters != null) {
             counters.incrCounter(key, amount);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Mon Aug
18 16:16:36 2008
@@ -73,7 +73,9 @@
                                     PARTITIONED_OUTPUT(51),
                                     STATUS(52),
                                     PROGRESS(53),
-                                    DONE(54);
+                                    DONE(54),
+                                    REGISTER_COUNTER(55),
+                                    INCREMENT_COUNTER(56);
     final int code;
     MessageType(int code) {
       this.code = code;
@@ -124,6 +126,15 @@
             handler.status(Text.readString(inStream));
           } else if (cmd == MessageType.PROGRESS.code) {
             handler.progress(inStream.readFloat());
+          } else if (cmd == MessageType.REGISTER_COUNTER.code) {
+            int id = WritableUtils.readVInt(inStream);
+            String group = Text.readString(inStream);
+            String name = Text.readString(inStream);
+            handler.registerCounter(id, group, name);
+          } else if (cmd == MessageType.INCREMENT_COUNTER.code) {
+            int id = WritableUtils.readVInt(inStream);
+            long amount = WritableUtils.readVLong(inStream);
+            handler.incrementCounter(id, amount);
           } else if (cmd == MessageType.DONE.code) {
             LOG.debug("Pipe child done");
             handler.done();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/OutputHandler.java Mon Aug
18 16:16:36 2008
@@ -19,9 +19,14 @@
 package org.apache.hadoop.mapred.pipes;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
@@ -37,7 +42,9 @@
   private float progressValue = 0.0f;
   private boolean done = false;
   private Throwable exception = null;
-  
+  private Map<Integer, Counters.Counter> registeredCounters = 
+    new HashMap<Integer, Counters.Counter>();
+
   /**
    * Create a handler that will handle any records output from the application.
    * @param collector the "real" collector that takes the output
@@ -121,4 +128,19 @@
     }
     return done;
   }
+
+  public void registerCounter(int id, String group, String name) throws IOException {
+    Counters.Counter counter = reporter.getCounter(group, name);
+    registeredCounters.put(id, counter);
+  }
+
+  public void incrementCounter(int id, long amount) throws IOException {
+    if (id < registeredCounters.size()) {
+      Counters.Counter counter = registeredCounters.get(id);
+      counter.increment(amount);
+    } else {
+      throw new IOException("Invalid counter with id: " + id);
+    }
+  }
+
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/UpwardProtocol.java Mon Aug
18 16:16:36 2008
@@ -72,4 +72,20 @@
    * @param e
    */
   void failed(Throwable e);
+  
+  /**
+   * Register a counter with the given id and group/name.
+   * @param group counter group
+   * @param name counter name
+   * @throws IOException
+   */
+  void registerCounter(int id, String group, String name) throws IOException;
+  
+  /**
+   * Increment the value of a registered counter.
+   * @param id counter id of the registered counter
+   * @param amount increment for the counter value
+   * @throws IOException
+   */
+  void incrementCounter(int id, long amount) throws IOException;
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=686893&r1=686892&r2=686893&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Mon Aug 18 16:16:36
2008
@@ -30,6 +30,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -37,6 +38,7 @@
 import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.util.StringUtils;
 
 import junit.framework.TestCase;
@@ -164,6 +166,15 @@
         rJob = Submitter.runJob(job);
       }
       assertTrue("pipes job failed", rJob.isSuccessful());
+      
+      Counters counters = rJob.getCounters();
+      Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
+      int numCounters = 0;
+      for (Counter c : wordCountCounters) {
+        System.out.println(c);
+        ++numCounters;
+      }
+      assertTrue("No counters found!", (numCounters > 0));
     }
 
     List<String> results = new ArrayList<String>();



Mime
View raw message