crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-330: Add option for disabling named output counters.
Date Thu, 23 Jan 2014 21:55:14 GMT
Updated Branches:
  refs/heads/apache-crunch-0.8 834260caa -> deca72853


CRUNCH-330: Add option for disabling named output counters.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/deca7285
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/deca7285
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/deca7285

Branch: refs/heads/apache-crunch-0.8
Commit: deca728532a18999c36348daafa9b15a49b73c81
Parents: 834260c
Author: Josh Wills <jwills@apache.org>
Authored: Thu Jan 23 12:46:01 2014 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Jan 23 13:53:43 2014 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/MultipleOutputIT.java     | 34 ++++++++++++++++++++
 .../org/apache/crunch/io/CrunchOutputs.java     | 19 +++++++----
 2 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/deca7285/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
index 96971f8..ffc09c3 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MultipleOutputIT.java
@@ -19,6 +19,7 @@ package org.apache.crunch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.io.File;
 import java.io.FilenameFilter;
@@ -27,9 +28,12 @@ import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.crunch.PipelineResult.StageResult;
 import org.apache.crunch.fn.Aggregators;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.At;
+import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
@@ -98,6 +102,36 @@ public class MultipleOutputIT {
     assertEquals("parallel Dos not fused into a single job", 1, result.getStageResults().size());
   }
 
+  @Test
+  public void testCountersEnabled() throws IOException {
+    PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, tmpDir.getDefaultConfiguration()),
+        WritableTypeFamily.getInstance());
+    
+    assertEquals(1, result.getStageResults().size());
+    StageResult stageResult = result.getStageResults().get(0);
+
+    String counterGroup = CrunchOutputs.class.getName();
+    assertEquals(3, stageResult.getCounterNames().get(counterGroup).size());
+    assertEquals(1l, stageResult.getCounterValue(counterGroup, "out1"));
+    assertEquals(1l, stageResult.getCounterValue(counterGroup, "out2"));
+    assertEquals(0l, stageResult.getCounterValue(counterGroup, "out3"));
+  }
+  
+  @Test
+  public void testCountersDisabled() throws IOException {
+    Configuration configuration = tmpDir.getDefaultConfiguration();
+    configuration.setBoolean(CrunchOutputs.CRUNCH_DISABLE_OUTPUT_COUNTERS, true);
+    
+    PipelineResult result = run(new MRPipeline(MultipleOutputIT.class, configuration),
+        WritableTypeFamily.getInstance());
+    
+    assertEquals(1, result.getStageResults().size());
+    StageResult stageResult = result.getStageResults().get(0);
+    
+    assertFalse(stageResult.getCounterNames().containsKey(CrunchOutputs.CRUNCH_OUTPUTS));
+  }
+  
+  
   public PipelineResult run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException
{
     String inputPath = tmpDir.copyResourceFileName("letters.txt");
     String outputPathEven = tmpDir.getFileName("even");

http://git-wip-us.apache.org/repos/asf/crunch/blob/deca7285/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
index cd1ebce..55fcc89 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -42,7 +42,8 @@ import java.util.Map;
  */
 public class CrunchOutputs<K, V> {
   public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
-  
+  public static final String CRUNCH_DISABLE_OUTPUT_COUNTERS = "crunch.disable.output.counters";
+
   private static final char RECORD_SEP = ',';
   private static final char FIELD_SEP = ';';
   private static final Joiner JOINER = Joiner.on(FIELD_SEP);
@@ -98,11 +99,12 @@ public class CrunchOutputs<K, V> {
   private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
   private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
 
-  private TaskInputOutputContext<?, ?, K, V> baseContext;
-  private Map<String, OutputConfig> namedOutputs;
-  private Map<String, RecordWriter<K, V>> recordWriters;
-  private Map<String, TaskAttemptContext> taskContextCache;
-  
+  private final TaskInputOutputContext<?, ?, K, V> baseContext;
+  private final Map<String, OutputConfig> namedOutputs;
+  private final Map<String, RecordWriter<K, V>> recordWriters;
+  private final Map<String, TaskAttemptContext> taskContextCache;
+  private final boolean disableOutputCounters;
+
   /**
    * Creates and initializes multiple outputs support,
    * it should be instantiated in the Mapper/Reducer setup method.
@@ -114,6 +116,7 @@ public class CrunchOutputs<K, V> {
     namedOutputs = getNamedOutputs(context);
     recordWriters = Maps.newHashMap();
     taskContextCache = Maps.newHashMap();
+    this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS,
false);
   }
   
   @SuppressWarnings("unchecked")
@@ -124,7 +127,9 @@ public class CrunchOutputs<K, V> {
         namedOutput + "'");
     }
     TaskAttemptContext taskContext = getContext(namedOutput);
-    baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
+    if (!disableOutputCounters) {
+      baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
+    }
     getRecordWriter(taskContext, namedOutput).write(key, value);
   }
   


Mime
View raw message