crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject svn commit: r1559821 - /crunch/site/trunk/content/user-guide.mdtext
Date Mon, 20 Jan 2014 20:08:12 GMT
Author: jwills
Date: Mon Jan 20 20:08:12 2014
New Revision: 1559821

Add section on unit testing


Modified: crunch/site/trunk/content/user-guide.mdtext
--- crunch/site/trunk/content/user-guide.mdtext (original)
+++ crunch/site/trunk/content/user-guide.mdtext Mon Jan 20 20:08:12 2014
@@ -61,6 +61,7 @@ Notice:   Licensed to the Apache Softwar
     1. [MRPipeline](#mrpipeline)
     1. [SparkPipeline](#sparkpipeline)
     1. [MemPipeline](#mempipeline)
+1. [Unit Testing Pipelines](#testing)
 <a name="intro"></a>
 ## Introduction to Crunch
@@ -1494,3 +1495,105 @@ on the read side. Often the best way to 
 `materialize()` method to get a reference to the contents of the in-memory collection and
then verify them directly,
 without writing them out to disk.
+<a name="testing"></a>
+## Unit Testing Pipelines
+For production data pipelines, unit tests are an absolute must. The [MemPipeline](#mempipeline)
implementation of the Pipeline
+interface has several tools to help developers create effective unit tests, which will be
detailed in this section.
+### Unit Testing DoFns
+Many of the DoFn implementations, such as `MapFn` and `FilterFn`, are very easy to test,
since they accept a single input
+and return a single output. For general purpose DoFns, we need an instance of the [Emitter](apidocs/0.9.0/org/apache/crunch/Emitter.html)
+interface that we can pass to the DoFn's `process` method and then read in the values that
are written by the function. Support
+for this pattern is provided by the [InMemoryEmitter](apidocs/0.9.0/org/apache/crunch/impl/mem/emit/InMemoryEmitter.html)
class, which
+has a `List<T> getOutput()` method that can be used to read the values that were passed
to the Emitter instance by a DoFn instance:
+	@Test
+	public void testToUpperCaseFn() {
+	  InMemoryEmitter<String> emitter = new InMemoryEmitter<String>();
+	  new ToUpperCaseFn().process("input", emitter);
+	  assertEquals(ImmutableList.of("INPUT"), emitter.getOutput());
+	}
+### Testing Complex DoFns and Pipelines
+Many of the DoFns we write involve more complex processing that require that our DoFn be
initialized and cleaned up, or that
+define Counters that we use to track the inputs that we receive. In order to ensure that
our DoFns are working properly across
+their entire lifecycle, it's best to use the [MemPipeline](#mempipeline) implementation to
create in-memory instances of
+PCollections and PTables that contain a small amount of test data and apply our DoFns to
those PCollections to test their
+functionality. We can easily retrieve the contents of any in-memory PCollection by calling
its `Iterable<T> materialize()`
+method, which will return immediately. We can also track the values of any Counters that
were called as the DoFns were
+executed against the test data by calling the static `getCounters()` method on the MemPipeline
instance, and reset
+those Counters between test runs by calling the static `clearCounters()` method:
+	public static class UpperCaseWithCounterFn extends DoFn<String, String> {
+	  @Override
+	  public void process(String input, Emitter<T> emitter) {
+	    String upper = input.toUpperCase();
+	    if (!upper.equals(input)) {
+	      increment("UpperCase", "modified");
+	    }
+	    emitter.emit(upper);
+	  }
+	}
+	@Before
+	public void setUp() throws Exception {
+	  MemPipeline.clearCounters();
+	}
+	@Test
+	public void testToUpperCase_WithPipeline() {
+	  PCollection<String> inputStrings = MemPipeline.collectionOf("a", "B", "c");
+	  PCollection<String> upperCaseStrings = inputStrings.parallelDo(new ToUpperCaseFn(),
+	  assertEquals(ImmutableList.of("A", "B", "C"), Lists.newArrayList(upperCaseStrings.materialize()));
+	  assertEquals(2L, MemPipeline.getCounters().findCounter("UpperCase", "modified").getValue());
+	}
+### Designing Testable Data Pipelines
+In the same way that we try to [write testable code](,
we want to ensure that
+our data pipelines are written in a way that makes them easy to test. In general, you should
try to break up complex pipelines
+into a number of function calls that perform a small set of operations on input PCollections
and return one or more PCollections
+as a result. This makes it easy to swap in different PCollection implementations for testing
and production runs.
+Let's look at an example that computes one iteration of the [PageRank](
algorithm that
+is taken from one of Crunch's integration tests:
+	// Each entry in the PTable represents a URL and its associated data for PageRank computations.
+	public static PTable<String, PageRankData> pageRank(PTable<String, PageRankData>
input, final float d) {
+	  PTypeFamily ptf = input.getTypeFamily();
+	  // Compute the outbound page rank from each of the input pages.
+	  PTable<String, Float> outbound = input.parallelDo(new DoFn<Pair<String, PageRankData>,
Pair<String, Float>>() {
+	    @Override
+	     public void process(Pair<String, PageRankData> input, Emitter<Pair<String,
Float>> emitter) {
+ 	     PageRankData prd = input.second();
+	      for (String link : prd.urls) {
+	        emitter.emit(Pair.of(link, prd.propagatedScore()));
+	      }
+	    }
+	  }, ptf.tableOf(ptf.strings(), ptf.floats()));
+	  // Update the PageRank for each URL.
+	  return input.cogroup(outbound).mapValues(
+	      new MapFn<Pair<Collection<PageRankData>, Collection<Float>>,
PageRankData>() {
+	        @Override
+	        public PageRankData map(Pair<Collection<PageRankData>, Collection<Float>>
input) {
+	          PageRankData prd = Iterables.getOnlyElement(input.first());
+	          Collection<Float> propagatedScores = input.second();
+	          float sum = 0.0f;
+	          for (Float s : propagatedScores) {
+	            sum += s;
+	          }
+	          return + (1.0f - d) * sum);
+	        }
+	      }, input.getValueType());
+	}
+By embedding our business logic inside of a static method that operates on PTables, we can
easily unit test our PageRank
+computations that combine custom DoFns with Crunch's built-in `cogroup` operation by using
the [MemPipeline](#mempipeline)
+implementation to create test data sets that we can easily verify by hand, and then this
same logic can be executed on
+a distributed data set using either the [MRPipeline](#mrpipeline) or [SparkPipeline](#sparkpipeline)

View raw message