crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject svn commit: r1559795 - /crunch/site/trunk/content/user-guide.mdtext
Date Mon, 20 Jan 2014 18:17:21 GMT
Author: greid
Date: Mon Jan 20 18:17:20 2014
New Revision: 1559795

URL: http://svn.apache.org/r1559795
Log:
CRUNCH 323 Add section on PType.getDetachedValue

Modified:
    crunch/site/trunk/content/user-guide.mdtext

Modified: crunch/site/trunk/content/user-guide.mdtext
URL: http://svn.apache.org/viewvc/crunch/site/trunk/content/user-guide.mdtext?rev=1559795&r1=1559794&r2=1559795&view=diff
==============================================================================
--- crunch/site/trunk/content/user-guide.mdtext (original)
+++ crunch/site/trunk/content/user-guide.mdtext Mon Jan 20 18:17:20 2014
@@ -54,6 +54,7 @@ Notice:   Licensed to the Apache Softwar
         1. [Sampling](#sampling)
         1. [Set Operations](#sets)
         1. [Splits](#splits)
+    1. [Retaining objects within DoFns](#objectreuse)
 1. [Crunch for HBase](#hbase)
 1. [Managing Pipeline Execution](#exec)
 1. [The Different Pipeline Implementations (Properties and Configuration options)](#pipelines)
@@ -1253,6 +1254,66 @@ you to split an input PCollection of Pai
   split.second().write(badOutputs);
 </pre>
 
+<a name="objectreuse"></a>
+### Retaining objects within DoFns
+
+For reasons of efficiency, Hadoop MapReduce repeatedly passes the [same references as keys
and values to Mappers and Reducers](https://issues.apache.org/jira/browse/HADOOP-2399) instead
of passing in new objects for each call. 
+The state of the singleton key and value objects is updated between each call 
+to `Mapper.map()` and `Reducer.reduce()`, as well as updating it between each 
+call to `Iterator.next` while iterating over the Iterable within a Reducer.
+
+The result of this optimization in MapReduce is that a reference to an object 
+received within a map or reduce call cannot be held on to past the scope of 
+that single method call invocation, as its value will change between 
+invocations of the method call. In some (but not all) situations, the 
+consequences of this optimization affect DoFns as well, meaning that you can't 
+simply retain a reference that is passed in to `DoFn.process` past the lifetime 
+of a method call.
+
+A convenience method called `getDetachedValue` is specified in the `PType` 
+interface to get around this limitation. Implementations of this method 
+perform a deep copy of values of their configured type if needed, and return 
+the value that has been "detached" from the ownership of the MapReduce 
+framework.
+
+In order to make use of the `getDetachedValue` method in a PType, you need to 
+have an initialized instance of the PType within the DoFn. Note that the 
+initialization of the PType should be performed in the `initialize()` method of 
+the DoFn.
+
+An example of a DoFn that would make use of getDetachedValue to correctly emit 
+the maximum value encountered would be implemented as follows:
+
+    public class FindMax<T extends Comparable> extends DoFn<T, T> {
+      
+      private PType<T> ptype;
+      private T maxValue;
+    
+      public FindMax(PType<T> ptype) {
+        this.ptype = ptype;
+      }
+
+      public void initialize() {
+        this.ptype.initialize(getConfiguration());
+      }
+
+      public void process(T input, Emitter<T> emitter) {
+        if (maxValue == null || maxValue.compareTo(input) > 0) {
+          // We need to call getDetachedValue here, otherwise the internal
+          // state of maxValue might change with each call to process()
+          // and we won't hold on to the max value
+          maxValue = ptype.getDetachedValue(input);
+        }
+      }
+
+      public void cleanup(Emitter<T> emitter) {
+        if (maxValue != null) {
+          emitter.emit(maxValue);
+        }
+      }
+    }
+
+
 <a name="hbase"></a>
 ## Crunch for HBase
 



Mime
View raw message