pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject svn commit: r589866 [5/11] - in /incubator/pig/trunk: ./ lib-src/ lib-src/bzip2/ lib-src/bzip2/org/ lib-src/bzip2/org/apache/ lib-src/bzip2/org/apache/tools/ lib-src/bzip2/org/apache/tools/bzip2r/ lib-src/shock/ lib-src/shock/org/ lib-src/shock/org/apa...
Date Mon, 29 Oct 2007 21:44:57 GMT
Added: incubator/pig/trunk/src/org/apache/pig/impl/builtin/SUBTRACT.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/builtin/SUBTRACT.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/builtin/SUBTRACT.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/builtin/SUBTRACT.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+public class SUBTRACT extends EvalFunc<DataAtom> {
+
+    @Override
+    public void exec(Tuple input, DataAtom output) throws IOException {
+        double v1 = input.getAtomField(0).numval();
+        double v2 = input.getAtomField(1).numval();
+        
+        output.setValue(v1-v2);
+    }
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new AtomSchema("difference");
+    }
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+
+
+public class ShellBagEvalFunc extends EvalFunc<DataBag> {
+	byte groupDelim = '\n';
+	byte recordDelim = '\n';
+	byte fieldDelim = '\t';
+	String fieldDelimString = "\t";
+	OutputStream os;
+	InputStream is;
+	InputStream es;
+	String cmd;
+	Thread processThread;
+	
+	LinkedBlockingQueue<DataBag> bags = new LinkedBlockingQueue<DataBag>();
+	
+	
+	public ShellBagEvalFunc(String cmd) {
+		this.cmd = cmd;
+	}
+
+	private class EndOfQueue extends DataBag{
+		public void add(Datum d){}
+	}
+	
+	private void startProcess() throws IOException {
+		Process p = Runtime.getRuntime().exec(cmd);
+		is = p.getInputStream();
+		os = p.getOutputStream();
+		es = p.getErrorStream();
+		
+		
+		new Thread() {
+			@Override
+			public void run() {
+				byte b[] = new byte[256];
+				int rc;
+				try {
+					while((rc = es.read(b)) > 0) {
+						System.err.write(b, 0, rc);
+					}
+				} catch(Exception e) {
+					e.printStackTrace();
+				}
+			}
+		}.start();
+		
+		
+		processThread = new Thread() {
+			@Override
+			public void run() {
+				while(true){
+					DataBag bag;
+					try{
+						bag = bags.take();
+					}catch(InterruptedException e){
+						continue;
+					}
+					if (bag instanceof EndOfQueue)
+						break;
+					try {
+						readBag(bag);
+					} catch (IOException e) {
+						e.printStackTrace();
+					}
+				}
+			}
+		};
+		
+		processThread.start();
+	}
+	
+	@Override
+	public void exec(Tuple input, DataBag output) throws IOException {
+		if (os == null) {
+			startProcess();
+		}
+		os.write(input.toDelimitedString(fieldDelimString).getBytes());
+		os.write(recordDelim);
+		os.write(groupDelim);
+		os.flush();
+		try{
+			bags.put(output);
+		}catch(InterruptedException e){}
+		
+		//Since returning before ensuring that output is present
+		output.markStale(true);
+		
+	}
+	
+	@Override
+	public void finish(){
+		try{
+			os.close();
+			try{
+				bags.put(new EndOfQueue());
+			}catch(InterruptedException e){}
+		}catch(IOException e){
+			e.printStackTrace();
+		}
+		while(true){
+			try{
+				processThread.join();
+				break;
+			}catch (InterruptedException e){}
+		}
+	}
+
+	@Override
+	public boolean isAsynchronous() {
+		return true;
+	}
+	
+	private void readBag(DataBag output) throws IOException {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		boolean inRecord = false;
+		int c;
+		while((c = is.read()) != -1) {
+			System.out.print(((char)c));
+			if ((inRecord == false) && (c == groupDelim)) {
+				output.markStale(false);
+				return;
+			}
+			inRecord = true;
+			if (c == recordDelim) {
+				inRecord = false;
+				Tuple t = new Tuple(baos.toString(), fieldDelimString);
+				// System.err.println(Thread.currentThread().getName() + ": Adding tuple " + t + " to collector " + output);
+				output.add(t);
+				baos = new ByteArrayOutputStream();
+				continue;
+			}
+			baos.write(c);
+		}
+	}
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+
+
+public class BinCondSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	protected Cond cond;
+    protected EvalSpec ifTrue;
+    protected EvalSpec ifFalse;
+    
+
+	public BinCondSpec(Cond cond, EvalSpec ifTrue, EvalSpec ifFalse) throws IOException{
+        this.cond = cond;
+        this.ifTrue = ifTrue;
+        this.ifFalse = ifFalse;
+        
+        if (ifTrue.isAsynchronous() || ifFalse.isAsynchronous())
+        	throw new IOException("Can't use the output of an asynchronous function as one of the branches of a bincond");
+    
+    }
+    
+    @Override
+    public List<String> getFuncs() {
+        List<String> funcs = cond.getFuncs();
+        funcs.addAll(ifTrue.getFuncs());
+        funcs.addAll(ifFalse.getFuncs());
+        return funcs;
+    }
+    
+    @Override
+	public void instantiateFunc(FunctionInstantiator fInstantiaor) throws IOException{
+    	cond.instantiateFunc(fInstantiaor);
+    	ifTrue.instantiateFunc(fInstantiaor);
+    	ifFalse.instantiateFunc(fInstantiaor);
+    };
+    @Override
+    public boolean amenableToCombiner() {
+    	return false;
+    }
+
+    @Override
+	protected Schema mapInputSchema(Schema schema) {
+    	return new TupleSchema();
+    }
+    
+    @Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    	return new DataCollector(endOfPipe){
+    		@Override
+    		public void add(Datum d) {
+    			if (cond.eval(d)){
+    				addToSuccessor(ifTrue.simpleEval(d));
+    			}else{
+    				addToSuccessor(ifFalse.simpleEval(d));
+    			}
+    		}
+    		
+    		@Override
+    		protected void finish(){
+    			cond.finish();
+    	    	ifTrue.finish();
+    	    	ifFalse.finish();
+    		}
+    	};
+    }
+
+    @Override
+    public String toString() {
+    	StringBuilder sb = new StringBuilder();
+        sb.append("[(");
+        sb.append(cond);
+        sb.append(" ? ");
+        sb.append(ifTrue);
+        sb.append(" : ");
+        sb.append(ifFalse);
+        sb.append(")]");
+        return sb.toString();
+    }
+    
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+
+/**
+ * Follows the composite design pattern
+ * @author utkarsh
+ *
+ */
+
+public class CompositeEvalSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	private List<EvalSpec> specs = new ArrayList<EvalSpec>();
+	
+	CompositeEvalSpec(EvalSpec spec){
+		specs.add(spec);
+	}
+		
+    @Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe){
+    	for (int i=specs.size()-1; i>=0; i--){
+    		endOfPipe = specs.get(i).setupDefaultPipe(endOfPipe);
+    	}
+    	return endOfPipe;
+    }
+
+    @Override
+	public List<String> getFuncs(){
+    	List<String> funcs = new ArrayList<String>();
+    	for(EvalSpec spec: specs){
+    		funcs.addAll(spec.getFuncs());
+    	}
+    	return funcs;
+    }
+    
+    @Override
+    public EvalSpec addSpec(EvalSpec spec){
+    	specs.add(spec);
+    	return this;
+    }
+    
+    @Override
+	public String toString() {
+        StringBuilder sb = new StringBuilder();
+        int i=0;
+        for (EvalSpec spec: specs){
+	        sb.append(spec.toString());
+        	i++;
+	        if (i != specs.size())
+	        	sb.append("->");
+        }
+        return sb.toString();
+    }
+
+    
+    /**
+     * Process the pipe and determine if the pipe is a candidate for 
+     * algebraic evaluation. 
+     * @return
+     */
+    @Override
+	public boolean amenableToCombiner() {
+        if (true) return false;
+    	for (EvalSpec spec: specs){
+    		if (!spec.amenableToCombiner())
+    			return false;
+    	}
+    	return true;
+    }
+    
+    @Override
+    public boolean isAsynchronous() {
+    	for (EvalSpec spec: specs)
+    		if (spec.isAsynchronous())
+    			return true;
+    	return false;
+    }
+
+    @Override
+	protected Schema mapInputSchema(Schema schema) {
+    	for (EvalSpec spec: specs)
+    		schema = spec.mapInputSchema(schema);
+    	return schema;
+    }
+    
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator fInstantiaor)
+			throws IOException {
+		for (EvalSpec spec: specs)
+    		spec.instantiateFunc(fInstantiaor);
+		
+	}
+
+	public List<EvalSpec> getSpecs() {
+		return specs;
+	}
+    
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/ConstSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/ConstSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/ConstSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/ConstSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+
+
+public class ConstSpec extends SimpleEvalSpec {
+	private static final long serialVersionUID = 1L;
+	public String constant;
+	transient public DataAtom atom;
+	
+	
+	public ConstSpec(String constant){
+		this.constant = constant;
+		init();
+	}
+	
+	public ConstSpec(Integer constant){
+		this.constant = constant.toString();
+		init();
+	}
+	
+	private void init(){
+		atom = new DataAtom(constant);
+	}
+	
+	/**
+     * Extend the default deserialization
+     * @param in
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException{
+    	in.defaultReadObject();
+    	init();
+    }
+	
+	
+	@Override
+	public boolean amenableToCombiner() {
+		return true;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		return new ArrayList<String>();
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		return new TupleSchema();
+	}
+
+	@Override
+	protected Datum eval(Datum d) {
+		return atom;
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append("'");
+		sb.append(constant);
+		sb.append("'");
+		sb.append("]");
+		return sb.toString();
+	}
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.eval.collector.FlattenCollector;
+import org.apache.pig.impl.eval.collector.UnflattenCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.DataBuffer;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+
+public abstract class EvalSpec implements Serializable{
+	boolean isFlattened; 
+	Schema schema;
+	transient DataBuffer simpleEvalOutput;
+	transient DataCollector simpleEvalInput; 
+	protected boolean inner = false; //used only if this generate spec is used in a group by
+
+	
+	/*
+	 * Keep a precomputed pipeline ready if we do simple evals
+	 * No separate code path for simple evals as earlier 
+	 */
+	private void init(){
+		simpleEvalOutput = new DataBuffer();
+		simpleEvalInput = setupPipe(simpleEvalOutput);
+	}
+	
+	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{};
+	
+    /**
+     * set up a default data processing pipe for processing by this spec
+     * This pipe does not include unflattening/flattening at the end
+     * @param endOfPipe The collector where output is desired
+     * @return The collector where input tuples should be put
+     */
+    protected abstract DataCollector setupDefaultPipe(DataCollector endOfPipe);
+    
+    
+    /**
+     * set up a data processing pipe with flattening/unflattening at the end
+     * based on the isFlattened field
+     * 
+     * @param endOfPipe where the output is desired 
+     * @return The collector where input tuples should be put
+     */
+    public DataCollector setupPipe(DataCollector endOfPipe){
+    	/*
+    	 * By default tuples flow through the eval pipeline in a flattened fashion
+    	 * Thus if flatten is true, we use the default setup pipe method, otherwise we add 
+    	 * an unflatten at the end
+     	 */
+    
+    	if (isFlattened){
+    		FlattenCollector fc = new FlattenCollector(endOfPipe);
+    		return setupDefaultPipe(fc);
+    	}else{
+    		UnflattenCollector uc = new UnflattenCollector(endOfPipe);
+    		return setupDefaultPipe(uc);
+    	}
+    }
+    
+    
+    /**
+     * set the succesor of this spec
+     * @param spec the new succesor
+     * @return
+     */
+    public EvalSpec addSpec(EvalSpec spec){
+    	CompositeEvalSpec ces = new CompositeEvalSpec(this);
+    	ces.addSpec(spec);
+    	return ces;
+    }
+    
+    /**
+     * Get the functions required by this spec
+     * @return
+     */
+    public abstract List<String> getFuncs(); 
+    
+    
+    public Schema getOutputSchemaForPipe(Schema input){
+    	if (schema!=null)
+    		return schema;
+    	else
+    		return mapInputSchema(input);
+    }
+    
+    /**
+     * Given an input schema, determine the output schema of this spec
+     * as it operates on input tuples with the input schema.
+     * @param input
+     * @return
+     */    
+    protected abstract Schema mapInputSchema(Schema schema);
+
+    /**
+     * A placeholder for any cleanup action that the spec needs to perform
+     *
+     */
+    public void finish(){
+    	if (simpleEvalInput == null)
+    		init();
+    	simpleEvalInput.finishPipe();
+    }
+
+    /**
+     * Some specs may be asynchronous, i.e., they return before completing the processing fully. 
+     * The default value is false, may be overridden to return true
+     */
+    public boolean isAsynchronous(){
+    	return false;
+    }
+    
+    /**
+     * To determine if this spec is a candidate for 
+     * algebraic evaluation. 
+     * @return
+     */
+    public abstract boolean amenableToCombiner();
+
+    
+    /**
+     * Compare 2 tuples according to this spec. This is used while sorting by arbitrary (even generated) fields.
+     * @return
+     */
+    public Comparator<Tuple> getComparator() {
+        return new Comparator<Tuple>() {
+        	
+        	public int compare(Tuple t1, Tuple t2) {
+    			return (simpleEval(t1).compareTo(simpleEval(t2)));
+            }
+        };
+    }
+    
+    public void setFlatten(boolean isFlattened){
+    	this.isFlattened = isFlattened;
+    }
+    
+    public boolean isFlattened(){
+    	return isFlattened;
+    }
+   
+    /**
+     * If the spec is such that it produces exactly one datum per input datum, we can use simple
+     * eval as a shortcut to the whole process of setting the pipe etc. However, the code path is
+     * still the same in both cases.
+     * @param input
+     * @return
+     */
+    public Datum simpleEval(Datum input){
+    	if (simpleEvalInput == null)
+    		init();
+    	simpleEvalInput.add(input);
+    	return simpleEvalOutput.removeFirstAndAssertEmpty();
+    }
+   
+    public EvalSpec getCombiner(){
+    	//TODO
+    	return null;
+    }
+    
+    public EvalSpec copy(PigContext pigContext){
+    	try{
+    		EvalSpec es = (EvalSpec) ObjectSerializer.deserialize(ObjectSerializer.serialize(this));
+    		es.instantiateFunc(pigContext);
+    		return es;
+    	}catch(IOException e){
+    		throw new RuntimeException(e);
+    	}
+    }
+    
+    public void setSchema(Schema schema){
+    	this.schema = schema;
+    }
+    
+
+	public boolean isInner() {
+		return inner;
+	}
+
+	public void setInner(boolean inner) {
+		this.inner = inner;
+	}
+
+
+    
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+
+public class FilterSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	public Cond cond;
+
+	public FilterSpec(Cond cond) {
+        this.cond = cond;
+    }
+    
+    @Override
+    public boolean amenableToCombiner() {
+    	return false;
+    }
+    
+    @Override
+    public List<String> getFuncs() {
+    	return cond.getFuncs();
+    }
+    
+    @Override
+	protected Schema mapInputSchema(Schema schema) {
+    	return schema;
+    }
+    
+    @Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+        return new DataCollector(endOfPipe) {
+
+            @Override
+			public void add(Datum d){
+            	if (checkDelimiter(d))
+            		addToSuccessor(d);
+            	else if (cond.eval(d)) 
+            		addToSuccessor(d);
+            }
+            
+            @Override
+            protected boolean needFlatteningLocally() {
+	            return true;
+            }
+            
+            @Override
+            protected void finish() {
+            	cond.finish();
+            }
+            
+        };
+    }
+    
+    @Override
+    public String toString() {
+    	StringBuilder sb = new StringBuilder();
+    	sb.append("[FILTER BY ");
+    	sb.append(cond.toString());
+    	sb.append("]");
+    	return sb.toString();
+    }
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+		cond.instantiateFunc(instantiaor);		
+	}
+    
+   
+    
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataMap;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+
+
+public class FuncEvalSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	String funcName;
+	EvalSpec args;
+	transient EvalFunc func;
+
+	public FuncEvalSpec(FunctionInstantiator fInstantiaor, String funcName, EvalSpec args) throws IOException{		
+		this.funcName = funcName;
+		this.args = args;
+		
+		if (args!=null && args.isAsynchronous())
+			throw new IOException("Can't have the output of an asynchronous function as the argument to an eval function");
+		instantiateFunc(fInstantiaor);
+	}
+	
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
+		if(instantiaor != null)
+			func = (EvalFunc) instantiaor.instantiateFuncFromAlias(funcName);
+		args.instantiateFunc(instantiaor);
+	}
+	
+	@Override
+	public boolean amenableToCombiner() {
+		// TODO Turn on algebraic
+		return false;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		List<String> funcs = new ArrayList<String>();
+		funcs.add(funcName);
+		return funcs;
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		Schema inputToFunction;
+		if (args!=null){
+			inputToFunction = args.mapInputSchema(schema);
+		}else{
+			inputToFunction = new TupleSchema();
+		}
+		
+		return func.outputSchema(inputToFunction);
+	}
+
+	@Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+		return new DataCollector(endOfPipe){
+			private Datum getPlaceHolderForFuncOutput(){
+				Type returnType = func.getReturnType();
+				if (returnType == DataAtom.class)
+					return new DataAtom();
+				else if (returnType == Tuple.class)
+					return new Tuple();
+				else if (returnType == DataBag.class)
+					return new FakeDataBag(successor);
+				else if (returnType == DataMap.class)
+					return new DataMap();
+				else throw new RuntimeException("Internal error: Unknown return type of eval function");
+			}
+			
+			@Override
+			public void add(Datum d) {
+				if (checkDelimiter(d))
+					addToSuccessor(d);
+				
+				Datum argsValue = null;
+				if (args!=null)
+					argsValue = args.simpleEval(d);
+				
+				if (argsValue!=null && !(argsValue instanceof Tuple))
+	        		throw new RuntimeException("Internal error: Non-tuple returned on evaluation of arguments.");
+	            
+				Datum placeHolderForFuncOutput = getPlaceHolderForFuncOutput();
+				try{
+					func.exec((Tuple)argsValue, placeHolderForFuncOutput);
+				}catch (IOException e){
+					RuntimeException re = new RuntimeException(e);
+					re.setStackTrace(e.getStackTrace());
+					throw re;
+				}
+				
+				if (placeHolderForFuncOutput instanceof FakeDataBag){
+					FakeDataBag fBag = (FakeDataBag)placeHolderForFuncOutput;
+					synchronized(fBag){
+						if (!fBag.isStale())
+							fBag.addDelimiters();
+					}
+				}else{
+					addToSuccessor(placeHolderForFuncOutput);
+				}
+			}
+			
+			@Override
+			protected void finish() {
+				if (args!=null) 
+					args.finish();
+				func.finish();
+			}			
+		};
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append(funcName);
+		sb.append("(");
+		sb.append(args);
+		sb.append(")");
+		sb.append("]");
+		return sb.toString();
+	}
+	
+	
+
+	private class FakeDataBag extends DataBag{
+		int staleCount = 0;
+		DataCollector successor;
+		boolean startAdded = false, endAdded = false;
+		
+		public FakeDataBag(DataCollector successor){
+			this.successor = successor;
+		}
+		
+		void addStart(){
+			successor.add(DataBag.startBag);
+			startAdded = true;	
+		}
+		
+		void addEnd(){
+			successor.add(DataBag.endBag);
+			endAdded = true;
+		}
+		
+		void addDelimiters(){
+			if (!startAdded)
+				addStart();
+			if (!endAdded)
+				addEnd();	
+		}
+		
+		@Override
+		public void add(Tuple t) {
+			synchronized(this){
+				if (!startAdded)
+					addStart();
+			}
+			successor.add(t);
+		}
+		
+		@Override
+		public void markStale(boolean stale) {
+			synchronized (this){
+				if (stale)
+					staleCount++;
+				else{
+					if (staleCount>0){
+						addDelimiters();
+						staleCount--;
+					}
+				}
+				super.markStale(stale);
+			}
+		}
+		
+		public boolean isStale(){
+			synchronized(this){
+				return staleCount > 0;
+			}
+		}
+	}
+	
+	
+	/**
+     * Extend the default deserialization
+     * @param in
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+	/*
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException{
+    	in.defaultReadObject();
+    	instantiateFunc();
+    }
+	*/
+	public EvalFunc getFunc() {
+		return func;
+	}
+	
+	public Type getReturnType(){
+		return func.getReturnType();
+	}
+	
+	@Override
+	public boolean isAsynchronous() {
+		return func.isAsynchronous();
+	}
+	
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+
+public class GenerateSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	protected List<EvalSpec> specs = new ArrayList<EvalSpec>();
+
+	protected int driver;
+
+
+
+    public GenerateSpec(List<EvalSpec> specs){
+    	this.specs = specs;
+    	selectDriver();
+    }  
+    
+    public GenerateSpec(EvalSpec col){
+    	specs.add(col);
+    }
+    
+    public GenerateSpec getGroupBySpec(){
+    	
+    	//Adding a new start spec to get the group by spec. The new star spec that
+    	//we are adding should not contain a schema since that can cause conflicts.
+    	
+    	StarSpec ss = new StarSpec();
+    	ss.setSchema(new TupleSchema());
+    	specs.add(ss);
+    	return new GenerateSpec(specs);
+    }
+    
+    @Override
+    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    	return new DataCollector(endOfPipe){
+    		LinkedList<CrossProductItem> pendingCrossProducts = new LinkedList<CrossProductItem>();
+    		
+    		@Override
+    		public void add(Datum d) {
+    			
+    			if (checkDelimiter(d))
+        			throw new RuntimeException("Internal error: not expecting a delimiter tuple");
+    			
+    	        // general case (use driver method):
+    	        CrossProductItem cpi = new CrossProductItem(d, successor);
+    	    	
+    	    	pendingCrossProducts.addLast(cpi);
+
+    	        
+    	        //Since potentially can return without filling output, mark output as stale
+    	        //the exec method of CrossProductItem will mark output as not stale
+    	        successor.markStale(true);
+    	        while (!pendingCrossProducts.isEmpty() && pendingCrossProducts.peek().isReady()){
+    	        	pendingCrossProducts.remove().exec();
+    	        }
+    		}
+    		
+    		@Override
+    		protected void finish() {
+    		 	for (EvalSpec spec: specs){
+    	    		if (specs.get(driver)!=spec)
+    	    			spec.finish();
+    	    	}
+    	     	
+    	    	while (!pendingCrossProducts.isEmpty()){
+    	    		CrossProductItem cpi = pendingCrossProducts.remove();
+    	        	cpi.waitToBeReady();
+    	    		cpi.exec();
+    	    	}
+    	    	
+    	    	specs.get(driver).finish();
+    		}
+    		
+    	};
+    }
+                 
+    private class DatumBag extends DataCollector{
+    	DataBag bag;
+    	public DatumBag(){
+    		super(null);
+    		try{
+    			bag = BagFactory.getInstance().getNewBag();
+    		}catch(IOException e){
+    			throw new RuntimeException(e);
+    		}
+    	}
+    	
+    	@Override
+		public void add(Datum d){
+    		bag.add(new Tuple(d));
+    	}
+    	
+    	public Iterator<Datum> content(){
+    		return new Iterator<Datum>(){
+    			Iterator<Tuple> iter;
+    			{
+    				iter = bag.content();
+    			}
+    			public boolean hasNext() {
+    				return iter.hasNext();
+    			}
+    			public Datum next() {
+    				try{
+    					return iter.next().getField(0);
+    				}catch(IOException e){
+    					throw new RuntimeException(e);
+    				}
+    			}
+    			public void remove() {
+    				throw new RuntimeException("Can't remove from read-only iterator");
+    			}
+    		};
+    	}
+    	
+    }
+
+    private class CrossProductItem extends DataCollector{
+    	DatumBag[] toBeCrossed;
+    	Datum cpiInput;    	
+    	
+    	public CrossProductItem(Datum driverInput, DataCollector successor){
+    		super(successor);
+    		this.cpiInput = driverInput;
+    		
+    		// materialize data for all to-be-crossed items
+            // (except driver, which is done in streaming fashion)
+            toBeCrossed = new DatumBag[specs.size()];
+            for (int i = 0; i < specs.size(); i++) {
+                if (i == driver)
+                    continue;
+                toBeCrossed[i] = new DatumBag();
+
+                specs.get(i).setupPipe(toBeCrossed[i]).add(cpiInput);
+            }
+    	}
+    	
+    	@Override
+    	public void add(Datum d){
+    		if (checkDelimiter(d))
+    			throw new RuntimeException("Internal error: not expecting a delimiter tuple");
+    	   int numItems = specs.size();
+    	   
+           // create one iterator per to-be-crossed bag
+           Iterator<Datum>[] its = new Iterator[numItems];
+           for (int i = 0; i < numItems; i++) {
+        	   if (i != driver){
+        		   its[i] = toBeCrossed[i].content();
+        		   if (!its[i].hasNext())
+        			   return; // one of inputs is empty, so cross-prod yields empty result
+        	   }
+           }
+
+           Datum[] lastOutput = null;
+           Datum[] outData = new Datum[numItems];
+
+           boolean done = false;
+           while (!done) {
+               if (lastOutput == null) { // we're generating our first output
+                   for (int i = 0; i < numItems; i++) {
+                       if (i == driver)
+                           outData[i] = d;
+                       else 
+                    	   outData[i] = its[i].next();
+                   }
+               } else {
+                   boolean needToAdvance = true;
+
+                   for (int i = 0; i < numItems; i++) {
+                       if (i!=driver && needToAdvance) {
+                           if (its[i].hasNext()) {
+                               outData[i] = its[i].next();
+                               needToAdvance = false;
+                           } else {
+                               its[i] = toBeCrossed[i].content(); // rewind iterator
+                               outData[i] = its[i].next();
+                               // still need to advance some other input..
+                           }
+                       } else {
+                           outData[i] = lastOutput[i]; // use same value as last time
+                       }
+                   }
+               }
+
+               // check for completion:
+               done = true;
+               
+               for (int i = 0; i < numItems; i++) {
+                   if (i!=driver && its[i].hasNext()) {
+                       done = false;
+                       break;
+                   }
+               }
+
+               Tuple outTuple = new Tuple();
+               
+               for (int i=0; i< numItems; i++){
+            	   if (specs.get(i).isFlattened() && outData[i] instanceof Tuple){
+        			   Tuple t = (Tuple)outData[i];
+        			   try{
+		    			   for (int j=0; j < t.arity(); j++){
+		    				   outTuple.appendField(t.getField(j));
+		    			   }
+        			   }catch (IOException e){
+        				   throw new RuntimeException(e);
+        			   }
+        		   }else{
+            		   outTuple.appendField(outData[i]);
+            	   }
+               }
+               successor.add(outTuple);
+
+               lastOutput = outData;
+           }
+    	}    		
+       
+       
+		public boolean isReady(){
+			for (int i=0; i<toBeCrossed.length; i++){
+				if (i!=driver && toBeCrossed[i].isStale())
+					return false;
+			}
+			return true;
+		}
+    	
+    	public void waitToBeReady(){
+    		for (int i=0; i<toBeCrossed.length; i++){
+    			if (i!=driver){
+    				synchronized(toBeCrossed[i]){
+    					while (toBeCrossed[i].isStale()){
+    						try{
+    							toBeCrossed[i].wait();
+    						}catch (InterruptedException e){}
+    					}
+    				}
+    			}
+    		}
+    		
+    	}
+    	
+    	public void exec(){
+            specs.get(driver).setupPipe(this).add(cpiInput);
+            //System.err.println(Thread.currentThread().getName() + ": Executing driver on " + cpiInput);
+            successor.markStale(false);
+    	}
+    	
+    }
+
+    /**
+     * Driver is the column that will be used to drive the cross product. The results
+     * for all other colums will be materialized into a data bag, while the results of
+     * the driver column will be streamed through to form the cross product.
+     *
+     */
+    private void selectDriver() {
+        driver = 0;
+
+        for (int i = 0; i < specs.size(); i++) {
+            EvalSpec spec = specs.get(i);
+            if (spec.isFlattened() || spec.isAsynchronous()){
+            	//This is just a heuristic that if its a flattened bag eval function, it should
+            	//be chosen as the driver
+            	if (spec instanceof CompositeEvalSpec)
+            		spec = ((CompositeEvalSpec)spec).getSpecs().get(0);
+            	if (spec instanceof FuncEvalSpec && ((FuncEvalSpec)spec).getFunc().getReturnType() == DataBag.class) { // trumps 'em all
+                    driver = i;
+                    return;
+                } 
+            	driver = i; // we'll use this as the driver, unless something better comes along
+            }
+        }
+    }
+ 
+    /**
+     * Determine if this instance of EvalItems is a candiate for algebraic
+     * evaluation. This means it contains an Algebraic Function, and does not
+     * contain repeated references to column used by the algebraic function.
+     * @return
+     */
+    @Override
+	public boolean amenableToCombiner() {
+    	//TODO
+        return false;
+    }
+
+    @Override
+	public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("GENERATE ");
+        sb.append("{");
+        boolean first = true;
+        for (EvalSpec spec: specs){
+        	if (!first)
+        		sb.append(",");
+        	else
+        		first = false;
+        	sb.append(spec);
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+    @Override
+	public List<String> getFuncs() {
+        List<String> funcs = new ArrayList<String>();
+        for (EvalSpec spec: specs)
+            funcs.addAll(spec.getFuncs());
+        return funcs;
+    }
+
+    @Override
+	protected TupleSchema mapInputSchema(Schema input) {
+        TupleSchema output = new TupleSchema();
+                
+        for (EvalSpec spec: specs) {
+        	Schema schema = spec.getOutputSchemaForPipe(input).copy();
+        	
+        	if (spec.isFlattened()){
+        		List<Schema> flattenedSchema = schema.flatten(); 
+        		if (flattenedSchema.size() == 0){
+        			output.add(new TupleSchema(),true);
+        			continue;
+        		}
+	        	for (Schema flattenedItem: flattenedSchema){
+	        		output.add(flattenedItem,true);
+	    		}
+        	}else{
+        		output.add(schema,false);
+        	}
+        }
+        return output;
+    }
+
+    @Override
+    public boolean isAsynchronous() {
+    	for (EvalSpec es: specs)
+    		if (es.isAsynchronous())
+    			return true;
+    	return false;
+    }
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+		for (EvalSpec es: specs)
+    		es.instantiateFunc(instantiaor);		
+	}
+
+	public List<EvalSpec> getSpecs() {
+		return specs;
+	}
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/MapLookupSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/MapLookupSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/MapLookupSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/MapLookupSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataMap;
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+
+public class MapLookupSpec extends SimpleEvalSpec {
+	
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	protected String keyToLookup;
+	
+	public MapLookupSpec(String keyToLookup){
+		this.keyToLookup = keyToLookup;
+	}
+
+	@Override
+	protected Datum eval(Datum d) {
+		if (!(d instanceof DataMap))
+			throw new RuntimeException("Attempt to lookup on data of type " + d.getClass().getName());
+		return ((DataMap)d).get(keyToLookup);
+	}
+	
+	@Override
+	public boolean amenableToCombiner() {
+		return true;
+	}
+	
+	@Override
+	public List<String> getFuncs() {
+		return new ArrayList<String>();
+	}
+	
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		//TODO: until we have map schemas
+		return new TupleSchema();
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append("#'");
+		sb.append(keyToLookup);
+		sb.append("'");
+		sb.append("]");
+		return sb.toString();
+	}
+	
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/ProjectSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+
+
+public class ProjectSpec extends SimpleEvalSpec {
+	private static final long serialVersionUID = 1L;
+
+	protected List<Integer> cols;
+	protected boolean wrapInTuple;
+	
+
+	public List<Integer> getCols() {
+		return cols;
+	}
+
+	public ProjectSpec(List<Integer> cols){		
+		this.cols = cols;
+	}
+	
+	public ProjectSpec(int col){		
+		cols = new ArrayList<Integer>();
+		cols.add(col);
+	}
+		
+	@Override
+	public boolean amenableToCombiner() {
+		return true;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		return new ArrayList<String>();
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		if (!wrapInTuple && cols.size()==1){
+			return maskNullSchema(schema.schemaFor(cols.get(0)));
+		}else{
+			TupleSchema output = new TupleSchema();
+			for (int i: cols){
+				output.add(maskNullSchema(schema.schemaFor(i)));
+			}
+			return output;
+		}
+	}
+	
+	private Schema maskNullSchema(Schema s){
+		if (s == null)
+			return new TupleSchema();
+		else
+			return s;
+		
+	}
+	
+	@Override
+	protected Datum eval(Datum d){
+		if (!(d instanceof Tuple)){
+			throw new RuntimeException("Project operator expected a Tuple, found a " + d.getClass().getSimpleName());
+		}
+		Tuple t = (Tuple)d;
+		
+		try{
+			if (!wrapInTuple && cols.size() == 1){
+				return t.getField(cols.get(0));
+			}else{
+				Tuple out = new Tuple();
+				for (int i: cols){
+					out.appendField(t.getField(i));
+				}
+				return out;
+			}
+		}catch (IOException e){
+			//TODO: Based on a strictness level, insert null values here
+				throw new RuntimeException(e);		
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append("PROJECT ");
+		boolean first = true;
+		for (int i: cols){
+			if (!first)
+				sb.append(",");
+			else
+				first = false;
+			sb.append("$");
+			sb.append(i);
+		}
+		sb.append("]");
+		return sb.toString();
+	}
+    
+    public int numCols() {
+        return cols.size();
+    }
+    
+    public int getCol(int i) {
+        if (i < 0 || i >= cols.size()) 
+            throw new RuntimeException("Internal error: improper use of getColumn in " + ProjectSpec.class.getName());
+        else return cols.get(i);
+    }
+	
+	public int getCol(){
+		if (cols.size()!=1)
+			throw new RuntimeException("Internal error: improper use of getColumn in " + ProjectSpec.class.getName());
+		return cols.get(0);
+	}
+
+	public void setWrapInTuple(boolean wrapInTuple) {
+		this.wrapInTuple = wrapInTuple;
+	}
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.eval.collector.DataCollector;
+
+
+public abstract class SimpleEvalSpec extends EvalSpec {
+
+	@Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+		return new DataCollector(endOfPipe){
+			@Override
+			public void add(Datum d) {
+				if (checkDelimiter(d))
+					addToSuccessor(d);
+				else
+					addToSuccessor(eval(d));
+			}
+			
+			@Override
+			protected boolean needFlatteningLocally() {
+				return true;
+			}
+		};
+	}
+	
+	protected abstract Datum eval(Datum d);
+
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+public class SortDistinctSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	transient DataBag bag;
+	protected EvalSpec sortSpec;
+	protected boolean eliminateDuplicates;
+	
+	
+	public SortDistinctSpec(boolean eliminateDuplicates, EvalSpec sortSpec){
+		this.eliminateDuplicates = eliminateDuplicates;
+		this.sortSpec = sortSpec;
+	}
+		
+	@Override
+	public boolean amenableToCombiner() {
+		//Combiner may potentially be useful if we are eliminating duplicates
+		return eliminateDuplicates;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		if (sortSpec!=null)
+			return sortSpec.getFuncs();
+		else
+			return new ArrayList<String>();
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		return schema;
+	}
+
+	@Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+		return new DataCollector(endOfPipe){
+			
+			@Override
+			public void add(Datum d) {
+				if (inTheMiddleOfBag){
+					if (checkDelimiter(d)){
+						addToSuccessor(bag);
+					}else{
+						if (d instanceof Tuple){
+							bag.add((Tuple)d);
+						}else{
+							bag.add(new Tuple(d));
+						}
+					}
+				}else{
+					if (checkDelimiter(d)){
+						//Bag must have started now
+						try{
+							bag = BagFactory.getInstance().getNewBag();
+							if (eliminateDuplicates)
+								bag.distinct();
+							else
+								bag.sort(sortSpec);
+							
+						}catch(IOException e){
+							throw new RuntimeException(e);
+						}
+					}else{
+						addToSuccessor(d);
+					}
+				}
+			}
+			
+			@Override
+			protected boolean needFlatteningLocally() {
+				return true;
+			}
+			
+			
+			@Override
+			protected void finish() {
+			
+				/*
+				 * To clear the temporary files if it was a big bag
+				 */
+				if (bag!=null)
+					bag.clear();
+			
+			}
+		};
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append(eliminateDuplicates?"DISTINCT ":"SORT ");
+		if (sortSpec!=null)
+			sb.append(sortSpec.toString());
+		sb.append("]");
+		return sb.toString();
+	}
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+		if (sortSpec!=null)
+			sortSpec.instantiateFunc(instantiaor);		
+	}
+
+	
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+final public class StarSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	@Override
+	public String toString() {
+        return "[*]";
+    }
+
+    @Override
+	public List<String> getFuncs() {
+        return new ArrayList<String>();
+    }
+    
+    @Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+    	return endOfPipe;
+    }
+    
+    @Override
+	protected Schema mapInputSchema(Schema input){
+    	return input;
+    }
+
+    @Override
+    public boolean amenableToCombiner() {
+    	return true;
+    }
+    
+    
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.collector;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+
+
+
+/**
+ * This interface is used to represent tuple collectors.
+ * It may be an in memory DataBag, an input to another function,
+ * or a file.
+ */
+public abstract class DataCollector {
+	Integer staleCount = 0;
+	protected boolean inTheMiddleOfBag = false;
+	
+	protected DataCollector successor = null;
+	
+	public DataCollector(DataCollector successor){
+		this.successor = successor;
+	}
+	
+	/**
+     * Add a tuple to the collector.
+	 */
+	public abstract void add(Datum d);
+	
+	private boolean needsFlattening(){
+		if (needFlatteningLocally() || (successor!=null && successor.needsFlattening()))
+			return true;
+		else
+			return false;
+	}
+	
+	/*
+	 * Whether this collector needs flattened bags to operate
+	 */
+	protected boolean needFlatteningLocally(){
+		return false;
+	}
+	
+	
+	protected boolean checkDelimiter(Datum d){
+		if (d instanceof DataBag.BagDelimiterTuple){
+			if (d instanceof DataBag.StartBag){
+				if (inTheMiddleOfBag)
+					throw new RuntimeException("Internal error: Found a flattened bag inside another");
+				else
+					inTheMiddleOfBag = true;
+			}else{
+				if (!(d instanceof DataBag.EndBag))
+					throw new RuntimeException("Internal error: Unknown bag delimiter type");
+				if (!inTheMiddleOfBag)
+					throw new RuntimeException("Internal error: Improper nesting of bag delimiter tuples");
+				inTheMiddleOfBag = false;
+			}
+			return true;
+		}
+		return false;
+	}
+
+	protected void addToSuccessor(Datum d){
+		if (d instanceof DataBag && !inTheMiddleOfBag && successor!=null && successor.needsFlattening()){
+			DataBag bag = (DataBag)d;
+			//flatten the bag and send it through the pipeline
+			successor.add(DataBag.startBag);
+		    Iterator<Tuple> iter = bag.content();
+	    	while(iter.hasNext())
+	    		successor.add(iter.next());
+	    	successor.add(DataBag.endBag);
+		}else{
+			//simply add the datum
+			successor.add(d);
+		}
+	}
+	
+	public void markStale(boolean stale){
+		synchronized(staleCount){
+			if (stale){
+				staleCount++;
+			}else{
+				if (staleCount > 0){
+					synchronized(this){
+						staleCount--;
+						notifyAll();
+					}
+				}
+			}
+		}
+		if (successor!=null)
+			successor.markStale(stale);
+	}
+	
+	public void setSuccessor(DataCollector output){
+		this.successor = output;
+	}
+	
+	public boolean isStale(){
+		synchronized(staleCount){
+			return staleCount>0;
+		}
+	}
+	
+	public void finishPipe(){
+		finish();
+		if (successor!=null)
+			successor.finishPipe();
+	}
+	
+	protected void finish(){}
+	
+}
+

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/FlattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/FlattenCollector.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/FlattenCollector.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/FlattenCollector.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.collector;
+
+import org.apache.pig.data.Datum;
+
+public class FlattenCollector extends DataCollector {
+
+	public FlattenCollector(DataCollector successor){
+		super(successor);
+	}
+	
+	@Override
+	public void add(Datum d) {
+		if (checkDelimiter(d))
+			return;
+		else
+			successor.add(d);
+	}
+
+	@Override
+	protected boolean needFlatteningLocally() {
+		return true;
+	}
+	
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.collector;
+
+import java.io.IOException;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+
+
+public class UnflattenCollector extends DataCollector {
+	DataBag bag;
+	
+	public UnflattenCollector(DataCollector successor){
+		super(successor);
+	}
+	
+	@Override
+	public void add(Datum d) {
+		if (inTheMiddleOfBag){
+			if (checkDelimiter(d)){
+				successor.add(bag);
+			}else{
+				if (d instanceof Tuple){
+					bag.add((Tuple)d);
+				}else{
+					bag.add(new Tuple(d));
+				}
+			}
+		}else{
+			if (checkDelimiter(d)){
+				//Bag must have started now
+				try{
+					bag = BagFactory.getInstance().getNewBag();
+				}catch(IOException e){
+					throw new RuntimeException(e);
+				}
+			}else{
+				successor.add(d);
+			}
+		}
+	}
+	
+	
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/AndCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/AndCond.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/AndCond.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/AndCond.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.cond;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+
+
+
+public class AndCond extends Cond {
+	private static final long serialVersionUID = 1L;
+	public List<Cond> cList;
+	
+	public AndCond(List<Cond> cList) {
+		this.cList = cList;
+	}
+	
+    @Override
+	public List<String> getFuncs() {
+        List<String> funcs = new ArrayList<String>();
+        for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
+            funcs.addAll(it.next().getFuncs());
+        }
+        return funcs;
+    }
+
+    @Override
+	public boolean eval(Datum input){
+        for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
+            if (it.next().eval(input) == false) return false;
+        }
+        return true;
+    }
+    
+    @Override
+	public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("(");
+        for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
+            sb.append(it.next());
+            if (it.hasNext()) sb.append(" AND ");
+        }
+        sb.append(")");
+        return sb.toString();
+    }
+    
+    @Override
+    public void finish() {
+    	for (Cond c: cList)
+    		c.finish();
+    }
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+		for (Cond c: cList)
+    		c.instantiateFunc(instantiaor);
+		
+	}
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/CompCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/CompCond.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/CompCond.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/CompCond.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.cond;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.EvalSpec;
+
+
+
+public class CompCond extends Cond {
+	private static final long serialVersionUID = 1L;
+	
+    public String op;   // one of "<", ">", "==", etc.
+    public EvalSpec left, right;
+    
+    
+
+
+	public CompCond(EvalSpec left, String op, EvalSpec right) throws IOException{
+        this.op = op.toLowerCase();
+        this.left = left;
+        this.right = right;
+        
+        if (left.isAsynchronous() || right.isAsynchronous()){
+        	throw new IOException("Can't compare the output of an asynchronous function");
+        }
+    } 
+   
+    @Override
+    public List<String> getFuncs() {
+        return new ArrayList<String>();
+    }
+
+    @Override
+    public boolean eval(Datum input) {
+    	
+    	Datum d1 = left.simpleEval(input);
+    	Datum d2 = right.simpleEval(input);
+    	
+    	if (!(d1 instanceof DataAtom) || !(d2 instanceof DataAtom)){
+    		throw new RuntimeException("Builtin functions cannot be used to compare non-atomic values. Use a filter function instead.");
+    	}
+    	
+    	DataAtom da1 = (DataAtom)d1;
+    	DataAtom da2 = (DataAtom)d2;
+    	
+    	
+        char op1 = op.charAt(0);
+        char op2 = op.length() >= 2 ? op.charAt(1) : '0';
+        char op3 = op.length() == 3 ? op.charAt(2) : '0';
+        
+        switch (op1) {
+            // numeric ops first
+        case '=':
+            if (op2 == '=') {
+                return da1.numval().equals(da2.numval());
+            } else {
+                throw new RuntimeException("Internal error: Invalid filter operator: " + op);
+            }
+        case '<':
+            if (op2 == '=') {
+                return da1.numval().compareTo(da2.numval()) <= 0;
+            } else {
+                return da1.numval().compareTo(da2.numval()) < 0;
+            }
+        case '>':
+            if (op2 == '=') {
+                return da1.numval().compareTo(da2.numval()) >= 0;
+            } else {
+                return da1.numval().compareTo(da2.numval()) > 0;
+            }
+        case '!':
+            if (op2 == '=') {
+                return !da1.numval().equals(da2.numval());
+            } else {
+                throw new RuntimeException("Internal error: Invalid filter operator: " + op);
+            }
+            // now string ops
+        case 'e':
+            if (op2 == 'q') {
+                return da1.equals(da2);
+            } else {
+                throw new RuntimeException("Internal error: Invalid filter operator: " + op);
+            }
+        case 'l':
+            if (op2 == 't' && op3 == 'e') {
+                return da1.compareTo(da2) <= 0;
+            } else {
+                return da1.compareTo(da2) < 0;
+            }
+        case 'g':
+            if (op2 == 't' && op3 == 'e') {
+                return da1.compareTo(da2) >= 0;
+            } else {
+                return da1.compareTo(da2) > 0;
+            }
+        case 'n':
+            if (op2 == 'e' && op3 == 'q') {
+                return !da1.equals(da2);
+            } else {
+                throw new RuntimeException("Internal error: Invalid filter operator: " + op);
+            }
+        default:
+            throw new RuntimeException("Internal error: Invalid filter operator: " + op);
+        }
+    }
+        
+    @Override
+	public String toString() {
+        return "(" + left + " " + op + " " + right + ")";
+    }
+    
+    @Override
+    public void finish() {
+    	left.finish();
+    	right.finish();
+    }
+
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+    	left.instantiateFunc(instantiaor);
+    	right.instantiateFunc(instantiaor);		
+	}
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/Cond.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/Cond.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/Cond.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/Cond.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.cond;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+
+
+
+public abstract class Cond implements Serializable{
+
+	public abstract List<String> getFuncs();
+    
+    public abstract boolean eval(Datum input);
+    
+    public abstract void finish();
+
+	abstract public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException;
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FalseCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FalseCond.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FalseCond.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FalseCond.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.cond;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+
+
+public class FalseCond extends Cond {
+
+	private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean eval(Datum input) {
+        return false;
+    }
+
+    @Override
+    public void finish() {
+    }
+
+    @Override
+    public List<String> getFuncs() {
+        return new ArrayList<String>();
+    }
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+		// TODO Auto-generated method stub
+		
+	}
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FuncCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FuncCond.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FuncCond.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/FuncCond.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.cond;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.FunctionInstantiator;
+import org.apache.pig.impl.eval.EvalSpec;
+
+
+
+public class FuncCond extends Cond {
+
+	private static final long serialVersionUID = 1L;
+	
+	public String funcName;
+    transient public FilterFunc func;
+    public EvalSpec args;
+
+	public FuncCond(FunctionInstantiator fInstantiaor, String funcName, EvalSpec args) throws IOException{       
+    	this.funcName = funcName; 
+    	this.args = args;
+    	if (args!=null && args.isAsynchronous())
+    		throw new IOException("Can't use the output of an asynchronous function as an argument");
+    	instantiateFunc(fInstantiaor);
+    	
+    }
+
+    @Override
+	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
+   		if(instantiaor != null)
+   			func = (FilterFunc)instantiaor.instantiateFuncFromAlias(funcName);
+    }
+    
+    @Override
+    public List<String> getFuncs() {
+        List<String> funcs = new ArrayList<String>();
+        funcs.add(funcName);
+        return funcs;
+    }
+
+    @Override
+    public boolean eval(Datum input){
+    	try {
+        	
+        	Datum d = null;
+        	if (args!=null)
+        		d = args.simpleEval(input);
+        	
+        	if (d!=null && !(d instanceof Tuple))
+        		throw new RuntimeException("Internal error: Non-tuple returned on evaluation of arguments.");
+            
+        	return func.exec((Tuple)d);
+        } catch (IOException e) {
+            System.out.println("Warning: filter function " + funcName + " failed. Substituting default value \'false\'.");
+            return false;
+        }
+    }
+    
+    @Override
+	public String toString() {
+        return funcName + "(" + args + ")";
+    }
+    
+    @Override
+    public void finish() {
+    	if (args!=null)
+    		args.finish();
+    	func.finish();
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/NotCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/NotCond.java?rev=589866&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/NotCond.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/cond/NotCond.java Mon Oct 29 14:44:47 2007
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval.cond;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.FunctionInstantiator;
+
+
+
+public class NotCond extends Cond {
+	private static final long serialVersionUID = 1L;
+	
+	public Cond cond;
+
+	public NotCond(Cond cond) {
+        this.cond = cond;
+    }
+
+    @Override
+	public List<String> getFuncs() {
+        return cond.getFuncs();
+    }
+
+    @Override
+	public boolean eval(Datum input){
+        return !cond.eval(input);
+    }
+    
+    @Override
+	public String toString() {
+        return "(NOT " + cond + ")";
+    }
+    
+    @Override
+    public void finish() {
+    	cond.finish();
+    }
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
+			throws IOException {
+		cond.instantiateFunc(instantiaor);
+		
+	}
+}



Mime
View raw message