chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r1515060 - in /incubator/chukwa/trunk: ./ conf/ src/main/java/org/apache/hadoop/chukwa/extraction/demux/ src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ src/test/java/org/apache/hadoop/chukwa/extraction/demux/
Date Sun, 18 Aug 2013 02:46:52 GMT
Author: eyang
Date: Sun Aug 18 02:46:51 2013
New Revision: 1515060

URL: http://svn.apache.org/r1515060
Log:
CHUKWA-581. Added support for custom reducer package name. (Ivy Tang via Eric Yang)

Modified:
    incubator/chukwa/trunk/CHANGES.txt
    incubator/chukwa/trunk/conf/chukwa-demux-conf.xml
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
    incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
    incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java

Modified: incubator/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/CHANGES.txt?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/CHANGES.txt (original)
+++ incubator/chukwa/trunk/CHANGES.txt Sun Aug 18 02:46:51 2013
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    CHUKWA-581. Added support for custom reducer package name. (Ivy Tang via Eric Yang)
+
     CHUKWA-669. JMX Adaptor.  (shreyas subramanya via asrabkin)
 
     CHUKWA-671. Json processors for processing JMX data from Hadoop, HBase and Zookeeper.
 (shreyas subramanya via asrabkin)

Modified: incubator/chukwa/trunk/conf/chukwa-demux-conf.xml
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/conf/chukwa-demux-conf.xml?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/conf/chukwa-demux-conf.xml (original)
+++ incubator/chukwa/trunk/conf/chukwa-demux-conf.xml Sun Aug 18 02:46:51 2013
@@ -105,7 +105,7 @@
 
 <!-- -->
 
-<!-- Demux configs -->
+<!-- Demux configs for mapper-->
 
   <property>
     <name>demux.max.error.count.before.shutdown</name>
@@ -128,49 +128,49 @@
   <property>
     <name>SysLog</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SysLog</value>
-    <description>Parser class for </description>
+    <description>Parser class for SysLog</description>
   </property>
 
   <property>
     <name>Df</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Df</value>
-    <description>Parser class for </description>
+    <description>Parser class for Df</description>
   </property>
 
   <property>
     <name>HadoopLog</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HadoopLogProcessor</value>
-    <description>Parser class for </description>
+    <description>Parser class for HadoopLog</description>
   </property>
 
   <property>
     <name>Iostat</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Iostat</value>
-    <description>Parser class for </description>
+    <description>Parser class for Iostat</description>
   </property>
  
    <property>
     <name>PbsNodes</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.PbsNodes</value>
-    <description>Parser class for </description>
+    <description>Parser class for PbsNodes</description>
   </property>
  
    <property>
     <name>Sar</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Sar</value>
-    <description>Parser class for </description>
+    <description>Parser class for Sar</description>
   </property>
 
    <property>
     <name>TsProcessor</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor</value>
-    <description>Parser class for </description>
+    <description>Parser class for TsProcessor</description>
    </property>
   
    <property>
     <name>Top</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.Top</value>
-    <description>Parser class for </description>
+    <description>Parser class for Top</description>
    </property>
 
    <property>
@@ -182,7 +182,7 @@
    <property>
     <name>DbLoader</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor</value>
-    <description>Parser class for </description>
+    <description>Parser class for DbLoader</description>
    </property>
 
    <property>
@@ -255,5 +255,29 @@
     <name>HBaseRegionServerProcessor</name>
     <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.HBaseRegionServerProcessor</value>
    </property>
+
+    <!-- Demux configs for reducer-->
+    <property>
+        <name>JobLogHistoryReduceProcessor</name>
+        <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.JobLogHistoryReduceProcessor</value>
+        <description> Reducer class for Reduce Type JobLogHistoryReduceProcessor </description>
+    </property>
+
+    <property>
+        <name>SystemMetrics</name>
+        <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.SystemMetrics</value>
+        <description> Reducer class for Reduce Type SystemMetrics </description>
+    </property>
+
+    <property>
+        <name>MRJobReduceProcessor</name>
+        <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.MRJobReduceProcessor</value>
+        <description> Reducer class for Reduce Type MRJobReduceProcessor </description>
+    </property>
+
+    <property>
+        <name>ClientTrace</name>
+        <value>,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ClientTrace</value>
+        <description> Reducer class for Reduce Type ClientTrace </description>
+    </property>
 </configuration>
- 

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
Sun Aug 18 02:46:51 2013
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -64,53 +65,54 @@ public class Demux extends Configured im
   public static Configuration jobConf = null;
 
   public static class MapClass extends MapReduceBase implements
-      Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
+          Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
 
     @Override
     public void configure(JobConf jobConf) {
       super.configure(jobConf);
-      Demux.jobConf= jobConf;
+      Demux.jobConf = jobConf;
     }
 
     public void map(ChukwaArchiveKey key, ChunkImpl chunk,
-        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-        throws IOException {
+                    OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter
reporter)
+            throws IOException {
 
       ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
-          "DemuxMapOutput", output, reporter);
+              "DemuxMapOutput", output, reporter);
       try {
         long duration = System.currentTimeMillis();
         if (log.isDebugEnabled()) {
           log.debug("Entry: [" + chunk.getData() + "] EventType: ["
-              + chunk.getDataType() + "]");
+                  + chunk.getDataType() + "]");
         }
 
         String defaultProcessor = Demux.jobConf.get(
-            "chukwa.demux.mapper.default.processor",
-            "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+                "chukwa.demux.mapper.default.processor",
+                "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
 
-        String processorClass = Demux.jobConf.get(chunk.getDataType(),
+        String processorClass_pri = Demux.jobConf.get(chunk.getDataType(),
                 defaultProcessor);
 
+        String processorClass = processorClass_pri.split(",")[0];
         if (!processorClass.equalsIgnoreCase("Drop")) {
           reporter.incrCounter("DemuxMapInput", "total chunks", 1);
           reporter.incrCounter("DemuxMapInput",
-              chunk.getDataType() + " chunks", 1);
+                  chunk.getDataType() + " chunks", 1);
 
           MapProcessor processor = MapProcessorFactory
-              .getProcessor(processorClass);
+                  .getProcessor(processorClass);
           processor.process(key, chunk, chukwaOutputCollector, reporter);
           if (log.isDebugEnabled()) {
             duration = System.currentTimeMillis() - duration;
             log.debug("Demux:Map dataType:" + chunk.getDataType()
-                + " duration:" + duration + " processor:" + processorClass
-                + " recordCount:" + chunk.getRecordOffsets().length);
+                    + " duration:" + duration + " processor:" + processorClass
+                    + " recordCount:" + chunk.getRecordOffsets().length);
           }
 
         } else {
           log.info("action:Demux, dataType:" + chunk.getDataType()
-              + " duration:0 processor:Drop recordCount:"
-              + chunk.getRecordOffsets().length);
+                  + " duration:0 processor:Drop recordCount:"
+                  + chunk.getRecordOffsets().length);
         }
 
       } catch (Exception e) {
@@ -121,7 +123,7 @@ public class Demux extends Configured im
   }
 
   public static class ReduceClass extends MapReduceBase implements
-      Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
+          Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
 
     public void configure(JobConf jobConf) {
       super.configure(jobConf);
@@ -129,27 +131,37 @@ public class Demux extends Configured im
     }
 
     public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
-        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-        throws IOException {
+                       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter
reporter)
+            throws IOException {
       ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
-          "DemuxReduceOutput", output, reporter);
+              "DemuxReduceOutput", output, reporter);
       try {
         long duration = System.currentTimeMillis();
         reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
         reporter.incrCounter("DemuxReduceInput", key.getReduceType()
-            + " total distinct keys", 1);
+                + " total distinct keys", 1);
 
-        String defaultProcessor = Demux.jobConf.get(
-            "chukwa.demux.reducer.default.processor", null);
-        ReduceProcessor processor = ReduceProcessorFactory.getProcessor(
-            key.getReduceType(), defaultProcessor);
+        String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer"
+
+                ".IdentityReducer";
+        String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor",
+                "," + defaultProcessor_classname);
+
+        String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor);
+        String[] processClass_tmps = processClass_pri.split(",");
+        String processClass = null;
+        if (processClass_tmps.length != 2)
+          processClass = defaultProcessor_classname;
+        else
+          processClass = processClass_tmps[1];
 
+        ReduceProcessor processor = ReduceProcessorFactory.getProcessor(processClass);
+        System.out.println(processor.getClass().getName());
         processor.process(key, values, chukwaOutputCollector, reporter);
 
         if (log.isDebugEnabled()) {
           duration = System.currentTimeMillis() - duration;
           log.debug("Demux:Reduce, dataType:" + key.getReduceType()
-              + " duration:" + duration);
+                  + " duration:" + duration);
         }
 
       } catch (Exception e) {
@@ -166,14 +178,14 @@ public class Demux extends Configured im
   }
 
   public static void addParsers(Configuration conf) {
-    String parserPath = conf.get("chukwa.data.dir")+File.separator+"demux";
+    String parserPath = conf.get("chukwa.data.dir") + File.separator + "demux";
     try {
       FileSystem fs = FileSystem.get(new Configuration());
       FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
-      if(fstatus!=null) {
+      if (fstatus != null) {
         String hdfsUrlPrefix = conf.get("fs.default.name");
 
-        for(FileStatus parser : fstatus) {
+        for (FileStatus parser : fstatus) {
           Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
           log.debug("Adding parser JAR path " + jarPath);
           DistributedCache.addFileToClassPath(jarPath, conf);
@@ -183,10 +195,10 @@ public class Demux extends Configured im
       log.error(ExceptionUtil.getStackTrace(e));
     }
   }
-  
+
   public int run(String[] args) throws Exception {
     JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
-    
+
 
     conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
     conf.setInputFormat(SequenceFileInputFormat.class);
@@ -199,7 +211,7 @@ public class Demux extends Configured im
     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
     conf.setJobPriority(JobPriority.VERY_HIGH);
     addParsers(conf);
-    
+
     List<String> other_args = new ArrayList<String>();
     for (int i = 0; i < args.length; ++i) {
       try {
@@ -215,14 +227,14 @@ public class Demux extends Configured im
         return printUsage();
       } catch (ArrayIndexOutOfBoundsException except) {
         System.out.println("ERROR: Required parameter missing from "
-            + args[i - 1]);
+                + args[i - 1]);
         return printUsage();
       }
     }
     // Make sure there are exactly 2 parameters left.
     if (other_args.size() != 2) {
       System.out.println("ERROR: Wrong number of parameters: "
-          + other_args.size() + " instead of 2.");
+              + other_args.size() + " instead of 2.");
       return printUsage();
     }
 

Modified: incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
(original)
+++ incubator/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ReduceProcessorFactory.java
Sun Aug 18 02:46:51 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.chukwa.extract
 
 
 import java.util.HashMap;
+
 import org.apache.log4j.Logger;
 
 public class ReduceProcessorFactory {
@@ -41,52 +42,39 @@ public class ReduceProcessorFactory {
   private ReduceProcessorFactory() {
   }
 
-  public static ReduceProcessor getProcessor(String reduceType, String defaultProcessor)
-      throws UnknownReduceTypeException {
-    String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
-        + reduceType;
+  /**
+   * Register a specific parser for a {@link ReduceProcessor} implementation.
+   */
+  public static synchronized void register(String reduceType,
+                                           ReduceProcessor processor) {
+    log.info("register " + processor.getClass().getName()
+            + " for this recordType :" + reduceType);
     if (processors.containsKey(reduceType)) {
-      return processors.get(reduceType);
+      throw new DuplicateReduceProcessorException(
+              "Duplicate processor for recordType:" + reduceType);
+    }
+    ReduceProcessorFactory.processors.put(reduceType, processor);
+  }
+
+  public static ReduceProcessor getProcessor(String processorClass) throws UnknownReduceTypeException
{
+    if (processors.containsKey(processorClass)) {
+      return processors.get(processorClass);
     } else {
       ReduceProcessor processor = null;
       try {
-        processor = (ReduceProcessor) Class.forName(path).getConstructor()
-            .newInstance();
+        processor = (ReduceProcessor) Class.forName(processorClass).newInstance();
       } catch (ClassNotFoundException e) {
-        // ******** WARNING ********
-        // If the ReduceProcessor is not there see if there is a configured
-        // default processor. If not, fall back on the Identity instead
-        if(defaultProcessor != null) {
-          processor = getProcessor(defaultProcessor, null);
-        }
-        else {
-          processor = getProcessor("IdentityReducer", null);
-        }
-        register(reduceType, processor);
-        return processor;
+        throw new UnknownReduceTypeException("Unknown reducer class for:" + processorClass,
e);
       } catch (Exception e) {
         throw new UnknownReduceTypeException("error constructing processor", e);
       }
-
       // TODO using a ThreadSafe/reuse flag to actually decide if we want
       // to reuse the same processor again and again
-      register(reduceType, processor);
+      register(processorClass, processor);
       return processor;
     }
-  }
 
-  /**
-   * Register a specific parser for a {@link ReduceProcessor} implementation.
-   */
-  public static synchronized void register(String reduceType,
-      ReduceProcessor processor) {
-    log.info("register " + processor.getClass().getName()
-        + " for this recordType :" + reduceType);
-    if (processors.containsKey(reduceType)) {
-      throw new DuplicateReduceProcessorException(
-          "Duplicate processor for recordType:" + reduceType);
-    }
-    ReduceProcessorFactory.processors.put(reduceType, processor);
   }
 
+
 }

Modified: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
(original)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxMapperConfigs.java
Sun Aug 18 02:46:51 2013
@@ -44,7 +44,7 @@ public class TestDemuxMapperConfigs exte
 
     JobConf conf = new JobConf();
     conf.set("chukwa.demux.mapper.default.processor",
-             "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor");
+             "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
     mapper.configure(conf);
 
     ChunkBuilder cb = new ChunkBuilder();
@@ -60,4 +60,31 @@ public class TestDemuxMapperConfigs exte
     assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
     assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
   }
+
+  public void testSetCustomeMapProcessor() throws IOException {
+    Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> mapper =
+            new Demux.MapClass();
+    String custom_DataType = "cus_dt";
+
+    JobConf conf = new JobConf();
+    conf.set(custom_DataType,
+            "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MockMapProcessor,");
+    mapper.configure(conf);
+
+    ChunkBuilder cb = new ChunkBuilder();
+    cb.addRecord(SAMPLE_RECORD_DATA.getBytes());
+    ChunkImpl chunk = (ChunkImpl)cb.getChunk();
+    chunk.setDataType(custom_DataType);
+
+    ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+            new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+    mapper.map(new ChukwaArchiveKey(), chunk, output, Reporter.NULL);
+    ChukwaRecordKey recordKey = new ChukwaRecordKey("someReduceType", SAMPLE_RECORD_DATA);
+
+    assertEquals("MockMapProcessor never invoked - no records found", 1, output.data.size());
+    assertNotNull("MockMapProcessor never invoked", output.data.get(recordKey));
+  }
+
+
 }

Modified: incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
URL: http://svn.apache.org/viewvc/incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java?rev=1515060&r1=1515059&r2=1515060&view=diff
==============================================================================
--- incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
(original)
+++ incubator/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/extraction/demux/TestDemuxReducerConfigs.java
Sun Aug 18 02:46:51 2013
@@ -39,7 +39,8 @@ public class TestDemuxReducerConfigs ext
             new Demux.ReduceClass();
 
     JobConf conf = new JobConf();
-    conf.set("chukwa.demux.reducer.default.processor", "MockReduceProcessor");
+    conf.set("chukwa.demux.reducer.default.processor", ",org.apache.hadoop.chukwa.extraction.demux.processor.reducer"
+
+            ".MockReduceProcessor");
     reducer.configure(conf);
 
     ChukwaRecordKey key = new ChukwaRecordKey("someReduceType", "someKey");
@@ -54,4 +55,27 @@ public class TestDemuxReducerConfigs ext
             "MockReduceProcessorValue",
             output.data.get(key).getValue("MockReduceProcessorKey"));
   }
+
+  public void testSetCustomReducerProcessor() throws IOException {
+    Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> reducer =
+            new Demux.ReduceClass();
+
+    JobConf conf = new JobConf();
+    String cus_reduceType = "someReduceType";
+    conf.set(cus_reduceType, ",org.apache.hadoop.chukwa.extraction.demux.processor.reducer"
+
+            ".MockReduceProcessor");
+    reducer.configure(conf);
+
+    ChukwaRecordKey key = new ChukwaRecordKey(cus_reduceType, "someKey");
+    ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord> output =
+            new ChukwaTestOutputCollector<ChukwaRecordKey, ChukwaRecord>();
+
+    reducer.reduce(key, null, output, Reporter.NULL);
+
+    assertEquals("MockReduceProcessor never invoked - no records found", 1, output.data.size());
+    assertNotNull("MockReduceProcessor never invoked", output.data.get(key));
+    assertEquals("MockReduceProcessor never invoked - key value incorrect",
+            "MockReduceProcessorValue",
+            output.data.get(key).getValue("MockReduceProcessorKey"));
+  }
 }
\ No newline at end of file



Mime
View raw message