pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Pig Wiki] Update of "PigExecutionModel" by AntonioMagnaghi
Date Wed, 30 Jan 2008 23:56:28 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The following page has been changed by AntonioMagnaghi:
http://wiki.apache.org/pig/PigExecutionModel

------------------------------------------------------------------------------
  
  The Package just takes each key, list of values and puts it in appropriate format as required
by the co-group. So lets say we have (1,R),(2,G) in A and (1,B), (2,Y) in B. If there are
two reducers, Global Rearrange catering to reducer 1 will have {1,{(1,R),(1,B)}} as the key,
list of values which should be converted into an output tuple for co-group based on the tagged
index of the tuples in the list. So this would be converted to {1,{(1,R)},{(1,B)}}. Similarly,
{2,{(2,G),(2,Y)}} will be converted to {2,{(2,G)},{(2,Y)}} by reducer 2.
  
+ === Logical to Physical Stubs and API's ===
+ 
+ Below are code samples that define possible APIs for the physical pipeline implementation.
The fragments aim at validating how pull model and splitting can be used in the generation
of a physical plan from a logical plan.
+ 
+ The sample code does not take advantage of type information at this point. Actual implementation
would use type information as made available from the logical plan.
+ 
+ The example focuses on the case of the FILTER operator, pull model (via iterators), and
splitting.
+ 
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ import java.util.Vector;
+ 
+ public abstract class PhysicalOperator {
+ 
+ 	public Vector<PhysicalOperator> inputs;
+ 	
+ 	public PhysicalOperator() {
+ 		inputs = new Vector<PhysicalOperator>();
+ 	}
+ 	
+ 	public abstract Iterator<Object> iterator();
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ 
+ public class POFilter extends PhysicalOperator {
+ 
+ 	public PhysicalOperator input;
+ 	public PhysicalOperator condition;
+ 	
+ 	public POFilter(PhysicalOperator input,
+                        PhysicalOperator condition) {
+ 		this.input = input;
+ 		this.condition = condition;
+ 	}
+ 	
+ 	private class POFilterIterator implements Iterator<Object> {
+ 		private Iterator<Object> inputIter;
+ 		private Iterator<Object> condIter;
+ 		
+ 		public POFilterIterator(Iterator<Object> inputIter,
+ 					     Iterator<Object> condIter) {
+ 			this.inputIter = inputIter;
+ 			this.condIter = condIter;
+ 		}
+ 		
+ 		public Object next() {
+ 			Object nextVal = null;
+ 			
+ 			while(inputIter.hasNext()) {
+ 				if (((Boolean)condIter.next()).booleanValue() == true) {
+ 					nextVal = inputIter.next();
+ 					break;
+ 				}
+ 				else {
+ 					// skip val
+ 					inputIter.next();
+ 				}
+ 			}
+ 			
+ 			return nextVal;
+ 		}
+ 		
+ 		public boolean hasNext() {			
+ 			assert(inputIter.hasNext() ==
+ 				   condIter.hasNext());
+ 			
+ 			return inputIter.hasNext();
+ 		}
+ 		
+ 		public void remove() {
+ 			throw new RuntimeException("Not supported");
+ 		}
+ 	}
+ 	
+ 	public Iterator<Object> iterator() {
+ 		return new POFilterIterator(input.iterator(),
+ 						  condition.iterator());
+ 	}
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ import java.util.Vector;
+ import java.util.Map;
+ import java.util.HashMap;
+ 
+ public class POSplit extends PhysicalOperator {
+ 
+ 	// most naive implementation:
+ 	// all is cached, saving comes from sharing the same
+ 	// values without recomputing them from the split input
+ 	//
+ 	Vector<Object> buffer;
+ 	
+ 	// key = split reader
+ 	// val = position in buffer that reader key is at
+ 	//
+ 	Map<POSplitReader, Integer> readerMap;
+ 	
+ 	Iterator<Object> iter;
+ 	
+ 	public POSplit(PhysicalOperator input) {
+ 		super();
+ 		inputs.add(input);
+ 		buffer = new Vector<Object>();
+ 		readerMap = new HashMap<POSplitReader, Integer>();
+ 		iter = null;
+ 	}
+ 	
+ 	public PhysicalOperator[] addReaders(int num) {
+ 		PhysicalOperator[] result = new PhysicalOperator[num];
+ 		for (int i = 0; i < num; ++i) {
+ 			result[i] = new POSplitReader(this);
+ 			
+ 			readerMap.put((POSplitReader)result[i],
+ 						  new Integer(-1));
+ 		}
+ 		return result;
+ 	}
+ 	
+ 	private class POSplitIterator implements Iterator<Object> {
+ 	
+ 		private POSplitReader reader;
+ 		private POSplit source;
+ 		
+ 		public POSplitIterator(POSplitReader reader,
+ 					  	       POSplit source) {
+ 			this.reader = reader;
+ 			this.source = source;
+ 		}
+ 		
+ 		public boolean hasNext() {
+ 			int readerPos = source.readerMap.get(reader).intValue();
+ 			int bufferSize = source.buffer.size();
+ 			
+ 			if (readerPos + 1 == bufferSize) {
+ 				// need to bring one value in if possible
+ 				//
+ 				if (source.iter.hasNext()) {
+ 					return true; //there is more stuff, not cached yet
+ 				}
+ 				else {
+ 					return false; // reached the end
+ 				}
+ 			}
+ 			else {
+ 				return true; // next value is cached already
+ 			}
+ 		}
+ 		
+ 		public Object next() {
+ 			int readerPos = source.readerMap.get(reader).intValue();
+ 			int bufferSize = source.buffer.size();
+ 			
+ 			if (readerPos + 1 == bufferSize) {
+ 				Object nextVal = source.iter.next();
+ 				
+ 				if (nextVal == null) {
+ 					return null;
+ 				}
+ 				else {
+ 					source.buffer.add(nextVal);
+ 				}
+ 			}
+ 
+ 			++readerPos;
+ 			source.readerMap.put(reader, new Integer(readerPos));
+ 				
+ 			return source.buffer.elementAt(readerPos);
+ 		}
+ 		
+ 		public void remove() {
+ 			throw new RuntimeException("Unsupported");
+ 		}
+ 	}
+ 	
+ 	public Iterator<Object> iterator(POSplitReader reader) {
+ 		iter = inputs.get(0).iterator();
+ 		return new POSplitIterator(reader, this);
+ 	}
+ 	
+ 	public Iterator<Object> iterator() {
+ 		throw new RuntimeException("Do not Iterate directly but Use a Split Reader");
+ 	}
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ import java.util.Iterator;
+ 
+ public class POSplitReader extends PhysicalOperator {
+ 
+ 	public POSplit source;
+ 	
+ 	public POSplitReader(POSplit source) {
+ 		super();
+ 		this.source = source;
+ 	}
+ 	
+ 	public Iterator<Object> iterator() {
+ 		return source.iterator(this);
+ 	}
+ }
+ }}}
+ 
+ {{{
+ package org.apache.pig.optimization;
+ 
+ public class PhysicalCompiler {
+ 
+ 	public static PhysicalOperator compile(LogicalOperator lo) {
+ 		IR ir = new IR(lo);
+ 		
+ 		return ir.compile();
+ 	}
+ }
+ 
+ ---
+ 
+ package org.apache.pig.optimization;
+ 
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.Vector;
+ 
+ /**
+  * In some cases we may need some pre-processing...
+  *
+  */
+ class IR {
+ 	LogicalOperator root;
+ 	
+ 	// key = logical node 
+ 	// val = vector of nodes under root that match key
+ 	//
+ 	Map<LogicalOperator, Vector<LogicalOperator>> nodeMap;
+ 	
+ 	// key = logical node
+ 	// val = vector of physical operators to use in the translation
+ 	//       process
+ 	//
+ 	Map<LogicalOperator, Vector<PhysicalOperator>> translationMap;
+ 	
+ 	IR(LogicalOperator root) {
+ 		this.root = root;
+ 		this.nodeMap = new HashMap<LogicalOperator,
+ 						 Vector<LogicalOperator>>();
+ 		this.translationMap = new HashMap<LogicalOperator,
+ 							  Vector<PhysicalOperator>>();
+ 	}	
+ 	
+ 	private void doMapNode(LogicalOperator root,
+ 						   LogicalOperator nodeToMap) {
+ 		if (root.equals(nodeToMap)) {
+ 			Vector<LogicalOperator> map = nodeMap.get(nodeToMap);
+ 			
+ 			map.add(root);
+ 		}
+ 		else {
+ 			for (LogicalOperator newRoot : root.inputs) {
+ 				doMapNode(newRoot, nodeToMap);
+ 			}
+ 		}
+ 	}
+ 	
+ 	public void mapNode(LogicalOperator nodeToMap) {
+ 		if (nodeMap.get(nodeToMap) == null) {
+ 			nodeMap.put(nodeToMap, new Vector<LogicalOperator>());
+ 			
+ 			doMapNode(root, nodeToMap);
+ 		}
+ 	}
+ 	
+ 	public int getNodeMapOccurence(LogicalOperator mapNode) {
+ 		mapNode(mapNode);
+ 		
+ 		Vector<LogicalOperator> map = nodeMap.get(mapNode);
+ 		
+ 		if (map != null) {
+ 			return map.size();
+ 		}
+ 		else {
+ 			return 0;
+ 		}
+ 	}
+ 	
+ 	public void translateNodeTo(LogicalOperator nodeToTranslate,
+ 		    PhysicalOperator[] translation) {
+ 		Vector<PhysicalOperator> vec = new Vector<PhysicalOperator>();
+ 		
+ 		for (PhysicalOperator po : translation) {
+ 			vec.add(po);
+ 		}
+ 		
+ 		translateNodeTo(nodeToTranslate,
+ 				   vec);
+ 	}
+ 
+ 	
+ 	public void translateNodeTo(LogicalOperator nodeToTranslate,
+ 				         Vector<PhysicalOperator> translation) {
+ 		mapNode(nodeToTranslate);
+ 		
+ 		Vector<LogicalOperator> map = nodeMap.get(nodeToTranslate);
+ 		
+ 		if (map.size() != translation.size()) {
+ 			throw new RuntimeException("Mismatch!");
+ 		}
+ 		
+ 		translationMap.put(nodeToTranslate, translation);
+ 	}
+ 	
+ 	private POFilter compileLOFilter(LOFilter filter) {
+ 		assert(filter.condition.type == LogicalOperator.PIG_TYPE.BOOLEAN);
+ 		
+ 		PhysicalOperator physicalInput = (new IR(filter.input)).compile();
+ 		
+ 		IR booleanCondIR = new IR(filter.condition);
+ 		
+ 		POSplit split = new POSplit(physicalInput);
+ 		
+ 		PhysicalOperator splitReads[] = 
+ 				split.addReaders(1 +
+ 						    booleanCondIR.getNodeMapOccurence(filter.input));
+ 		
+ 		booleanCondIR.translateNodeTo(filter.input, splitReads);
+ 		
+ 		PhysicalOperator booleanCond = booleanCondIR.compile();
+ 		
+ 		POFilter result = new POFilter(physicalInput,
+ 						     booleanCond);
+ 		
+ 		return result;
+ 	}
+ 
+ 	public PhysicalOperator compile() {
+ 		if (root instanceof LOFilter) {
+ 			return compileLOFilter((LOFilter) root);
+ 		}
+ 		else {
+ 			throw new RuntimeException("Unsupported Logical Operator");
+ 		}
+ 	}
+ }
+ }}}
+ 

Mime
View raw message