pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r614325 [3/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac...
Date Tue, 22 Jan 2008 21:17:22 GMT
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFAny.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFAny.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFAny.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFAny.java Tue Jan 22 13:17:12 2008
@@ -21,27 +21,26 @@
 import java.util.Random;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
 
 
 /**
  * built-in grouping function; permits system to choose any grouping.
  */
-public class GFAny extends EvalFunc<DataAtom> {
-	public static final int defaultNumGroups = 1000;
-	
-	int numGroups = defaultNumGroups;
-	Random r = new Random();
-	
-	public GFAny(){}
-	
-	public GFAny(int numGroups){
-		this.numGroups = numGroups;
-	}
-	
-	@Override
-	public void exec(Tuple input, DataAtom output) throws IOException{
-		output.setValue(r.nextInt(numGroups));
-	}
+public class GFAny extends EvalFunc<Integer> {
+    public static final int defaultNumGroups = 1000;
+    
+    int numGroups = defaultNumGroups;
+    Random r = new Random();
+    
+    public GFAny(){}
+    
+    public GFAny(int numGroups){
+        this.numGroups = numGroups;
+    }
+    
+    @Override
+    public Integer exec(Tuple input) throws IOException{
+        return r.nextInt(numGroups);
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java Tue Jan 22 13:17:12 2008
@@ -21,62 +21,67 @@
 import java.util.Random;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 
 public class GFCross extends EvalFunc<DataBag> {
-	int numInputs, myNumber, numGroupsPerInput;
-	
-	public static int DEFAULT_PARALLELISM = 96;
+    int numInputs, myNumber, numGroupsPerInput;
+    BagFactory mBagFactory = BagFactory.getInstance();
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    
+    public static int DEFAULT_PARALLELISM = 96;
 
-	@Override
-	public void exec(Tuple input, DataBag output) throws IOException {;
-			numInputs = Integer.parseInt(input.getAtomField(0).strval());
-			myNumber = Integer.parseInt(input.getAtomField(1).strval());
-		
-		
-			numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
-			int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
-			
-			int[] digits = new int[numInputs];
-			for (int i=0; i<numInputs; i++){
-				if (i == myNumber){
-					Random r = new Random(System.currentTimeMillis());
-					digits[i] = r.nextInt(numGroupsPerInput);
-				}else{
-					digits[i] = 0;
-				}
-			}
-			
-			for (int i=0; i<numGroupsGoingTo; i++){
-				output.add(toTuple(digits));
-				next(digits);
-			}			
+    @Override
+    public DataBag exec(Tuple input) throws IOException {;
+        numInputs = (Integer)input.get(0);
+        myNumber = (Integer)input.get(1);
+        DataBag output = mBagFactory.newDefaultBag();
+        
+        numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
+        int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+            
+        int[] digits = new int[numInputs];
+        for (int i=0; i<numInputs; i++){
+            if (i == myNumber){
+                Random r = new Random(System.currentTimeMillis());
+                digits[i] = r.nextInt(numGroupsPerInput);
+            }else{
+                digits[i] = 0;
+            }
+        }
+            
+        for (int i=0; i<numGroupsGoingTo; i++){
+            output.add(toTuple(digits));
+            next(digits);
+        }            
 
-	}
-	
-	private Tuple toTuple(int[] digits) throws IOException{
-		Tuple t = new Tuple(numInputs);
-		for (int i=0; i<numInputs; i++){
-			t.setField(i, digits[i]);
-		}
-		return t;
-	}
-	
-	private void next(int[] digits){
-		for (int i=0; i<numInputs; i++){
-			if (i== myNumber)
-				continue;
-			else{
-				if (digits[i] == numGroupsPerInput - 1){
-					digits[i] = 0;
-				}else{
-					digits[i]++;
-					break;
-				}
-			}
-		}
-	}
+        return output;
+    }
+    
+    private Tuple toTuple(int[] digits) throws IOException{
+        Tuple t = mTupleFactory.newTuple(numInputs);
+        for (int i=0; i<numInputs; i++){
+            t.set(i, digits[i]);
+        }
+        return t;
+    }
+    
+    private void next(int[] digits){
+        for (int i=0; i<numInputs; i++){
+            if (i== myNumber)
+                continue;
+            else{
+                if (digits[i] == numGroupsPerInput - 1){
+                    digits[i] = 0;
+                }else{
+                    digits[i]++;
+                    break;
+                }
+            }
+        }
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFReplicate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFReplicate.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFReplicate.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFReplicate.java Tue Jan 22 13:17:12 2008
@@ -15,32 +15,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.builtin;
-
+package org.apache.pig.impl.builtin;
+
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+
+public class GFReplicate extends EvalFunc<DataBag> {
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    BagFactory mBagFactory = BagFactory.getInstance();
+
+    int numGroups = GFAny.defaultNumGroups;
+    
+    public GFReplicate(){}
+    
+    public GFReplicate(int numGroups) {
+        this.numGroups = numGroups;
+    }
+    
+    @Override
+    public DataBag exec(Tuple input) throws IOException {
+        DataBag b = mBagFactory.newDefaultBag();
+        for (int i = 0; i < numGroups; i++) {
+            b.add(mTupleFactory.newTuple(new Integer(1)));
+        }
+        return b;
+    }
 
-
-public class GFReplicate extends EvalFunc<DataBag> {
-
-	int numGroups = GFAny.defaultNumGroups;
-	
-	public GFReplicate(){}
-	
-	public GFReplicate(int numGroups){
-		this.numGroups = numGroups;
-	}
-	
-	@Override
-	public void exec(Tuple input, DataBag output) throws IOException{
-		for (int i=0; i<numGroups; i++){
-			output.add(new Tuple(new DataAtom(i)));
-		}
-	}
-
-}
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/MULTIPLY.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/MULTIPLY.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/MULTIPLY.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/MULTIPLY.java Tue Jan 22 13:17:12 2008
@@ -20,21 +20,21 @@
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 
 
-public class MULTIPLY extends EvalFunc<DataAtom> {
+public class MULTIPLY extends EvalFunc<Double> {
 
     @Override
-    public void exec(Tuple input, DataAtom output) throws IOException {
-        double v1 = input.getAtomField(0).numval();
-        double v2 = input.getAtomField(1).numval();
-        output.setValue(v1*v2);
+    public Double exec(Tuple input) throws IOException {
+        double v1 = (Double)input.get(0);
+        double v2 = (Double)input.get(1);
+        return new Double(v1 * v2);
     }
+
     @Override
     public Schema outputSchema(Schema input) {
         return new AtomSchema("product");

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/SUBTRACT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/SUBTRACT.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/SUBTRACT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/SUBTRACT.java Tue Jan 22 13:17:12 2008
@@ -20,21 +20,21 @@
 import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 
-public class SUBTRACT extends EvalFunc<DataAtom> {
+public class SUBTRACT extends EvalFunc<Double> {
 
     @Override
-    public void exec(Tuple input, DataAtom output) throws IOException {
-        double v1 = input.getAtomField(0).numval();
-        double v2 = input.getAtomField(1).numval();
+    public Double exec(Tuple input) throws IOException {
+        double v1 = (Double)input.get(0);
+        double v2 = (Double)input.get(1);
         
-        output.setValue(v1-v2);
+        return new Double(v1 - v2);
     }
+
     @Override
     public Schema outputSchema(Schema input) {
         return new AtomSchema("difference");

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java Tue Jan 22 13:17:12 2008
@@ -22,144 +22,177 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultAbstractBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 
 public class ShellBagEvalFunc extends EvalFunc<DataBag> {
-	byte groupDelim = '\n';
-	byte recordDelim = '\n';
-	byte fieldDelim = '\t';
-	String fieldDelimString = "\t";
-	OutputStream os;
-	InputStream is;
-	InputStream es;
-	String cmd;
-	Thread processThread;
-	
-	LinkedBlockingQueue<DataBag> bags = new LinkedBlockingQueue<DataBag>();
-	
-	
-	public ShellBagEvalFunc(String cmd) {
-		this.cmd = cmd;
-	}
-
-	private class EndOfQueue extends DataBag{
-		EndOfQueue() { super(Datum.DataType.TUPLE); }
-		public void add(Datum d){}
-	}
-	
-	private void startProcess() throws IOException {
-		Process p = Runtime.getRuntime().exec(cmd);
-		is = p.getInputStream();
-		os = p.getOutputStream();
-		es = p.getErrorStream();
-		
-		
-		new Thread() {
-			@Override
-			public void run() {
-				byte b[] = new byte[256];
-				int rc;
-				try {
-					while((rc = es.read(b)) > 0) {
-						System.err.write(b, 0, rc);
-					}
-				} catch(Exception e) {
-					e.printStackTrace();
-				}
-			}
-		}.start();
-		
-		
-		processThread = new Thread() {
-			@Override
-			public void run() {
-				while(true){
-					DataBag bag;
-					try{
-						bag = bags.take();
-					}catch(InterruptedException e){
-						continue;
-					}
-					if (bag instanceof EndOfQueue)
-						break;
-					try {
-						readBag(bag);
-					} catch (IOException e) {
-						e.printStackTrace();
-					}
-				}
-			}
-		};
-		
-		processThread.start();
-	}
-	
-	@Override
-	public void exec(Tuple input, DataBag output) throws IOException {
-		if (os == null) {
-			startProcess();
-		}
-		os.write(input.toDelimitedString(fieldDelimString).getBytes());
-		os.write(recordDelim);
-		os.write(groupDelim);
-		os.flush();
-		try{
-			bags.put(output);
-		}catch(InterruptedException e){}
-		
-		//Since returning before ensuring that output is present
-		output.markStale(true);
-		
-	}
-	
-	@Override
-	public void finish(){
-		try{
-			os.close();
-			try{
-				bags.put(new EndOfQueue());
-			}catch(InterruptedException e){}
-		}catch(IOException e){
-			e.printStackTrace();
-		}
-		while(true){
-			try{
-				processThread.join();
-				break;
-			}catch (InterruptedException e){}
-		}
-	}
-
-	@Override
-	public boolean isAsynchronous() {
-		return true;
-	}
-	
-	private void readBag(DataBag output) throws IOException {
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		boolean inRecord = false;
-		int c;
-		while((c = is.read()) != -1) {
-			System.out.print(((char)c));
-			if ((inRecord == false) && (c == groupDelim)) {
-				output.markStale(false);
-				return;
-			}
-			inRecord = true;
-			if (c == recordDelim) {
-				inRecord = false;
-				Tuple t = new Tuple(baos.toString(), fieldDelimString);
-				// System.err.println(Thread.currentThread().getName() + ": Adding tuple " + t + " to collector " + output);
-				output.add(t);
-				baos = new ByteArrayOutputStream();
-				continue;
-			}
-			baos.write(c);
-		}
-	}
+    byte groupDelim = '\n';
+    byte recordDelim = '\n';
+    byte fieldDelim = '\t';
+    String fieldDelimString = "\t";
+    OutputStream os;
+    InputStream is;
+    InputStream es;
+    String cmd;
+    Thread processThread;
+    BagFactory mBagFactory = BagFactory.getInstance();
+    TupleFactory mTupleFactory = TupleFactory.getInstance();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    private ArrayList<Object> mProtoTuple = new ArrayList<Object>();
+    
+    LinkedBlockingQueue<DataBag> bags = new LinkedBlockingQueue<DataBag>();
+    
+    
+    public ShellBagEvalFunc(String cmd) {
+        this.cmd = cmd;
+    }
+
+    private class EndOfQueue extends DefaultAbstractBag {
+        @Override
+        public void add(Tuple t){}
+
+        // To satisfy abstract functions in DataBag.
+        public boolean isSorted() { return false; }
+        public boolean isDistinct() { return false; }
+        public Iterator<Tuple> iterator() { return null; }
+        public long spill() { return 0; }
+    }
+    
+    private void startProcess() throws IOException {
+        Process p = Runtime.getRuntime().exec(cmd);
+        is = p.getInputStream();
+        os = p.getOutputStream();
+        es = p.getErrorStream();
+        
+        
+        new Thread() {
+            @Override
+            public void run() {
+                byte b[] = new byte[256];
+                int rc;
+                try {
+                    while((rc = es.read(b)) > 0) {
+                        System.err.write(b, 0, rc);
+                    }
+                } catch(Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+        
+        
+        processThread = new Thread() {
+            @Override
+            public void run() {
+                while(true){
+                    DataBag bag;
+                    try{
+                        bag = bags.take();
+                    }catch(InterruptedException e){
+                        continue;
+                    }
+                    if (bag instanceof EndOfQueue)
+                        break;
+                    try {
+                        readBag(bag);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        };
+        
+        processThread.start();
+    }
+    
+    @Override
+    public DataBag exec(Tuple input) throws IOException {
+        DataBag output = mBagFactory.newDefaultBag();
+        if (os == null) {
+            startProcess();
+        }
+        os.write(input.toDelimitedString(fieldDelimString).getBytes());
+        os.write(recordDelim);
+        os.write(groupDelim);
+        os.flush();
+        try{
+            bags.put(output);
+        }catch(InterruptedException e){}
+        
+        //Since returning before ensuring that output is present
+        output.markStale(true);
+
+        return output;
+        
+    }
+    
+    @Override
+    public void finish(){
+        try{
+            os.close();
+            try{
+                bags.put(new EndOfQueue());
+            }catch(InterruptedException e){}
+        }catch(IOException e){
+            e.printStackTrace();
+        }
+        while(true){
+            try{
+                processThread.join();
+                break;
+            }catch (InterruptedException e){}
+        }
+    }
+
+    @Override
+    public boolean isAsynchronous() {
+        return true;
+    }
+    
+    private void readBag(DataBag output) throws IOException {
+        baos.reset();
+        boolean inRecord = false;
+        int c;
+        while((c = is.read()) != -1) {
+            System.out.print(((char)c));
+            if ((inRecord == false) && (c == groupDelim)) {
+                output.markStale(false);
+                return;
+            }
+            inRecord = true;
+            if (c == recordDelim) {
+                inRecord = false;
+                readField();
+                Tuple t =  mTupleFactory.newTuple(mProtoTuple);
+                mProtoTuple.clear();
+                output.add(t);
+                continue;
+            } else if (c == fieldDelim) {
+                readField();
+            }
+            baos.write(c);
+        }
+    }
+
+    private void readField() {
+        if (baos.size() == 0) {
+            // NULL value
+            mProtoTuple.add(null);
+        } else {
+            // TODO, once this can take schemas, we need to figure out
+            // if the user requested this to be viewed as a certain
+            // type, and if so, then construct it appropriately.
+            mProtoTuple.add(new DataByteArray(baos.toByteArray()));
+        }
+        baos.reset();
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/BinCondSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/BinCondSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/BinCondSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/BinCondSpec.java Tue Jan 22 13:17:12 2008
@@ -20,7 +20,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.eval.cond.Cond;
@@ -61,10 +60,6 @@
     	ifTrue.instantiateFunc(fInstantiaor);
     	ifFalse.instantiateFunc(fInstantiaor);
     };
-    @Override
-    public boolean amenableToCombiner() {
-    	return false;
-    }
 
     @Override
 	protected Schema mapInputSchema(Schema schema) {
@@ -75,7 +70,7 @@
 	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
     	return new DataCollector(endOfPipe){
     		@Override
-    		public void add(Datum d) {
+    		public void add(Object d) {
     			if (cond.eval(d)){
     				addToSuccessor(ifTrue.simpleEval(d));
     			}else{
@@ -104,5 +99,13 @@
         sb.append(")]");
         return sb.toString();
     }
+
+	public EvalSpec ifTrue() { return ifTrue; }
+	public EvalSpec ifFalse() { return ifFalse; }
+    
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitBinCond(this);
+	}
     
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/CompositeEvalSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/CompositeEvalSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/CompositeEvalSpec.java Tue Jan 22 13:17:12 2008
@@ -38,7 +38,7 @@
 	
 	private List<EvalSpec> specs = new ArrayList<EvalSpec>();
 	
-	CompositeEvalSpec(EvalSpec spec){
+	public CompositeEvalSpec(EvalSpec spec){
 		specs.add(spec);
 	}
 		
@@ -79,21 +79,6 @@
     }
 
     
-    /**
-     * Process the pipe and determine if the pipe is a candidate for 
-     * algebraic evaluation. 
-     * @return
-     */
-    @Override
-	public boolean amenableToCombiner() {
-        if (true) return false;
-    	for (EvalSpec spec: specs){
-    		if (!spec.amenableToCombiner())
-    			return false;
-    	}
-    	return true;
-    }
-    
     @Override
     public boolean isAsynchronous() {
     	for (EvalSpec spec: specs)
@@ -122,4 +107,9 @@
 		return specs;
 	}
     
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitCompositeEval(this);
+	}
+    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/ConstSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/ConstSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/ConstSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/ConstSpec.java Tue Jan 22 13:17:12 2008
@@ -15,82 +15,73 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval;
-
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.eval;
+
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-
-public class ConstSpec extends SimpleEvalSpec {
-	private static final long serialVersionUID = 1L;
-	public String constant;
-	transient public DataAtom atom;
-	
-	
-	public ConstSpec(String constant){
-		this.constant = constant;
-		init();
-	}
-	
-	public ConstSpec(Integer constant){
-		this.constant = constant.toString();
-		init();
-	}
-	
-	private void init(){
-		atom = new DataAtom(constant);
-	}
-	
-	/**
-     * Extend the default deserialization
-     * @param in
-     * @throws IOException
-     * @throws ClassNotFoundException
-     */
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException{
-    	in.defaultReadObject();
-    	init();
-    }
-	
-	
-	@Override
-	public boolean amenableToCombiner() {
-		return true;
-	}
-
-	@Override
-	public List<String> getFuncs() {
-		return new ArrayList<String>();
-	}
-
-	@Override
-	protected Schema mapInputSchema(Schema schema) {
-		return new TupleSchema();
-	}
-
-	@Override
-	protected Datum eval(Datum d) {
-		return atom;
-	}
-	
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("[");
-		sb.append("'");
-		sb.append(constant);
-		sb.append("'");
-		sb.append("]");
-		return sb.toString();
-	}
-
-}
+
+
+
+public class ConstSpec extends SimpleEvalSpec {
+	private static final long serialVersionUID = 1L;
+	public Object mConst;
+	
+	
+	public ConstSpec(Object constant){
+		mConst = constant;
+	}
+	
+	/**
+     * Extend the default deserialization
+     * @param in
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+     /* Why do I need this?
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException{
+    	in.defaultReadObject();
+    	init();
+    }
+    */
+	
+	@Override
+	public List<String> getFuncs() {
+		return new ArrayList<String>();
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		return new TupleSchema();
+	}
+
+	@Override
+	protected Object eval(Object d) {
+		return mConst;
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append("'");
+		sb.append(mConst);
+		sb.append("'");
+		sb.append("]");
+		return sb.toString();
+	}
+
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitConst(this);
+	}
+
+	public String value() { return (String)mConst; }
+    
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java Tue Jan 22 13:17:12 2008
@@ -22,8 +22,10 @@
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.collector.DataCollector;
@@ -41,6 +43,8 @@
 	transient DataCollector simpleEvalInput; 
 	protected boolean inner = false; //used only if this generate spec is used in a group by
 
+	private String comparatorFuncName;
+	private transient Comparator<Tuple> comparator;
 	
 	/*
 	 * Keep a precomputed pipeline ready if we do simple evals
@@ -51,7 +55,38 @@
 		simpleEvalInput = setupPipe(simpleEvalOutput);
 	}
 	
-	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{};
+	public class UserComparator implements Comparator<Tuple> {
+		Comparator<Tuple> nested;
+        TupleFactory mTupleFactory = TupleFactory.getInstance();
+		
+		UserComparator(Comparator<Tuple> nested) {
+			this.nested = nested;
+		}
+    	public int compare(Tuple t1, Tuple t2) {
+    		Object d1 = simpleEval(t1);
+    		Object d2 = simpleEval(t2);
+    		if (d1 instanceof Tuple) {
+    			return nested.compare((Tuple)d1, (Tuple)d2);
+    		} else {
+    			return nested.compare(mTupleFactory.newTuple(d1),
+                    mTupleFactory.newTuple(d2));
+    		}
+        }
+	}
+	
+	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
+		if (comparatorFuncName != null) {
+			Comparator<Tuple> userComparator = 
+				(ComparisonFunc)instantiaor.instantiateFuncFromAlias(comparatorFuncName);
+			comparator = new UserComparator(userComparator);
+		} else {
+			comparator = new Comparator<Tuple>() {
+		    	public int compare(Tuple t1, Tuple t2) {
+                    return DataType.compare(simpleEval(t1), simpleEval(t2));
+		        }
+		    };
+		}
+	};
 	
     /**
      * set up a default data processing pipe for processing by this spec
@@ -137,25 +172,30 @@
     	return false;
     }
     
-    /**
-     * To determine if this spec is a candidate for 
-     * algebraic evaluation. 
-     * @return
-     */
-    public abstract boolean amenableToCombiner();
-
+    public void setComparatorName(String name) {
+        comparatorFuncName = name;
+    }
+    
+    public String getComparatorName() {
+        return comparatorFuncName;
+    }
     
     /**
      * Compare 2 tuples according to this spec. This is used while sorting by arbitrary (even generated) fields.
      * @return
      */
-    public Comparator<Datum> getComparator() {
-        return new Comparator<Datum>() {
-        	
-        	public int compare(Datum t1, Datum t2) {
-    			return (simpleEval(t1).compareTo(simpleEval(t2)));
-            }
-        };
+    public Comparator<Tuple> getComparator() {
+		if (comparator != null)
+            return comparator;
+		else
+        {
+            comparator = new Comparator<Tuple>() {
+                public int compare(Tuple t1, Tuple t2) {
+                    return DataType.compare(simpleEval(t1), simpleEval(t2));
+                }
+            };
+            return comparator;
+        }
     }
     
     public void setFlatten(boolean isFlattened){
@@ -173,7 +213,7 @@
      * @param input
      * @return
      */
-    public Datum simpleEval(Datum input){
+    public Object simpleEval(Object input){
     	if (simpleEvalInput == null)
     		init();
     	simpleEvalInput.add(input);
@@ -208,6 +248,6 @@
 		this.inner = inner;
 	}
 
-
+	public abstract void visit(EvalSpecVisitor v);
     
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/FilterSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/FilterSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/FilterSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/FilterSpec.java Tue Jan 22 13:17:12 2008
@@ -20,7 +20,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.eval.cond.Cond;
@@ -38,11 +37,6 @@
     }
     
     @Override
-    public boolean amenableToCombiner() {
-    	return false;
-    }
-    
-    @Override
     public List<String> getFuncs() {
     	return cond.getFuncs();
     }
@@ -57,7 +51,7 @@
         return new DataCollector(endOfPipe) {
 
             @Override
-			public void add(Datum d){
+			public void add(Object d){
             	if (checkDelimiter(d))
             		addToSuccessor(d);
             	else if (cond.eval(d)) 
@@ -90,6 +84,11 @@
 	public void instantiateFunc(FunctionInstantiator instantiaor)
 			throws IOException {
 		cond.instantiateFunc(instantiaor);		
+	}
+
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitFilter(this);
 	}
     
    

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java Tue Jan 22 13:17:12 2008
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package org.apache.pig.impl.eval;
+import java.util.Iterator;
 
 import java.io.IOException;
 import java.lang.reflect.Type;
@@ -23,10 +24,8 @@
 import java.util.List;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
+import org.apache.pig.Algebraic;
+import org.apache.pig.data.DefaultAbstractBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
@@ -53,18 +52,13 @@
 	
 	@Override
 	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
-		if(instantiaor != null)
+		if(instantiaor != null) {
 			func = (EvalFunc) instantiaor.instantiateFuncFromAlias(funcName);
+        }
 		args.instantiateFunc(instantiaor);
 	}
 	
 	@Override
-	public boolean amenableToCombiner() {
-		// TODO Turn on algebraic
-		return false;
-	}
-
-	@Override
 	public List<String> getFuncs() {
 		List<String> funcs = new ArrayList<String>();
 		funcs.add(funcName);
@@ -86,7 +80,8 @@
 	@Override
 	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
 		return new DataCollector(endOfPipe){
-			private Datum getPlaceHolderForFuncOutput(){
+            /*
+			private Object getPlaceHolderForFuncOutput(){
 				Type returnType = func.getReturnType();
 				if (returnType == DataAtom.class)
 					return new DataAtom();
@@ -98,36 +93,37 @@
 					return new DataMap();
 				else throw new RuntimeException("Internal error: Unknown return type of eval function");
 			}
-			
+            */
+
 			@Override
-			public void add(Datum d) {
+			public void add(Object d) {
 				if (checkDelimiter(d))
 					addToSuccessor(d);
 				
-				Datum argsValue = null;
+				Object argsValue = null;
 				if (args!=null)
 					argsValue = args.simpleEval(d);
 				
 				if (argsValue!=null && !(argsValue instanceof Tuple))
 	        		throw new RuntimeException("Internal error: Non-tuple returned on evaluation of arguments.");
 	            
-				Datum placeHolderForFuncOutput = getPlaceHolderForFuncOutput();
+				Object funcOutput;
 				try{
-					func.exec((Tuple)argsValue, placeHolderForFuncOutput);
+					funcOutput = func.exec((Tuple)argsValue);
 				}catch (IOException e){
 					RuntimeException re = new RuntimeException(e);
 					re.setStackTrace(e.getStackTrace());
 					throw re;
 				}
 				
-				if (placeHolderForFuncOutput instanceof FakeDataBag){
-					FakeDataBag fBag = (FakeDataBag)placeHolderForFuncOutput;
+				if (funcOutput instanceof FakeDataBag){
+					FakeDataBag fBag = (FakeDataBag)funcOutput;
 					synchronized(fBag){
 						if (!fBag.isStale())
 							fBag.addDelimiters();
 					}
 				}else{
-					addToSuccessor(placeHolderForFuncOutput);
+					addToSuccessor(funcOutput);
 				}
 			}
 			
@@ -154,23 +150,29 @@
 	
 	
 
-	private class FakeDataBag extends DataBag{
+	private class FakeDataBag extends DefaultAbstractBag {
 		int staleCount = 0;
 		DataCollector successor;
 		boolean startAdded = false, endAdded = false;
 		
 		public FakeDataBag(DataCollector successor){
-			super(Datum.DataType.TUPLE);
 			this.successor = successor;
 		}
+
+        // To satisfy abstract functions in DataBag.
+        public boolean isSorted() { return false; }
+        public boolean isDistinct() { return false; }
+        public Iterator<Tuple> iterator() { return null; }
+        public long spill() { return 0; }
+
 		
 		void addStart(){
-			successor.add(DataBag.startBag);
+			successor.add(DefaultAbstractBag.startBag);
 			startAdded = true;	
 		}
 		
 		void addEnd(){
-			successor.add(DataBag.endBag);
+			successor.add(DefaultAbstractBag.endBag);
 			endAdded = true;
 		}
 		
@@ -182,12 +184,12 @@
 		}
 		
 		@Override
-		public void add(Datum d) {
+		public void add(Tuple t) {
 			synchronized(this){
 				if (!startAdded)
 					addStart();
 			}
-			successor.add(d);
+			successor.add(t);
 		}
 		
 		@Override
@@ -237,5 +239,66 @@
 	public boolean isAsynchronous() {
 		return func.isAsynchronous();
 	}
+
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitFuncEval(this);
+	}
+
+	public String getFuncName() { return funcName; }
+
+	public EvalSpec getArgs() { return args; }
+
+    public void setArgs(EvalSpec a) { args = a; }
+
+    /**
+     * This will replace the function to be called by this spec to be the
+     * initial instance instead of the general instance.  This should only
+     * be called if the function is algebraic.  It will only change the
+     * funcName variable, not the func variable itself.
+     */
+    public void resetFuncToInitial() {
+        if (!combinable()) {
+            throw new AssertionError(
+                "Can't convert non-algebraic function to inital.");
+        }
+        funcName = ((Algebraic)func).getInitial();
+    }
+
+    /**
+     * This will replace the function to be called by this spec to be the
+     * intermediate instance instead of the general instance.  This should only
+     * be called if the function is algebraic.  It will only change the
+     * funcName variable, not the func variable itself.
+     */
+    public void resetFuncToIntermediate() {
+        if (!combinable()) {
+            throw new AssertionError(
+                "Can't convert non-algebraic function to intermediate.");
+        }
+        funcName = ((Algebraic)func).getIntermed();
+    }
+
+    /**
+     * This will replace the function to be called by this spec to be the
+     * final instance instead of the general instance.  This should only
+     * be called if the function is algebraic.  It will only change the
+     * funcName variable, not the func variable itself.
+     * @param finalTuplePos position in the tuple handed to the final
+     * function that it should use.
+     */
+    public void resetFuncToFinal() {
+        if (!combinable()) {
+            throw new AssertionError(
+                "Can't convert non-algebraic function to final.");
+        }
+        funcName = ((Algebraic)func).getFinal();
+    }
+
+    public boolean combinable() {
+        // constructor should have called by instantiateFunc
+        if (func != null) return (func instanceof Algebraic);
+        else return false;
+    }
 	
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java Tue Jan 22 13:17:12 2008
@@ -25,8 +25,8 @@
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -40,6 +40,7 @@
 
 	protected int driver;
 
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
 
     public GenerateSpec(List<EvalSpec> specs){
@@ -68,7 +69,7 @@
     		LinkedList<CrossProductItem> pendingCrossProducts = new LinkedList<CrossProductItem>();
     		
     		@Override
-    		public void add(Datum d) {
+    		public void add(Object d) {
     			
     			if (checkDelimiter(d))
         			throw new RuntimeException("Internal error: not expecting a delimiter tuple");
@@ -110,30 +111,30 @@
     	DataBag bag;
     	public DatumBag(){
     		super(null);
-    		try{
-    			bag = BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
-    		}catch(IOException e){
-    			throw new RuntimeException(e);
-    		}
+    		bag = BagFactory.getInstance().newDefaultBag();
     	}
     	
     	@Override
-		public void add(Datum d){
-    		bag.add(new Tuple(d));
+		public void add(Object d){
+    		bag.add(mTupleFactory.newTuple(d));
     	}
     	
-    	public Iterator<Datum> content(){
-    		return new Iterator<Datum>(){
-    			Iterator<Datum> iter;
+    	public Iterator<Object> content(){
+    		return new Iterator<Object>(){
+    			Iterator<Tuple> iter;
     			{
-    				iter = bag.content();
+    				iter = bag.iterator();
     			}
     			public boolean hasNext() {
     				return iter.hasNext();
     			}
-    			public Datum next() {
-    				return ((Tuple)iter.next()).getField(0);
-    			}
+                public Object next() {
+                    try {
+                        return iter.next().get(0);
+                    } catch(IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
     			public void remove() {
     				throw new RuntimeException("Can't remove from read-only iterator");
     			}
@@ -144,9 +145,9 @@
 
     private class CrossProductItem extends DataCollector{
     	DatumBag[] toBeCrossed;
-    	Datum cpiInput;    	
+    	Object cpiInput;    	
     	
-    	public CrossProductItem(Datum driverInput, DataCollector successor){
+    	public CrossProductItem(Object driverInput, DataCollector successor){
     		super(successor);
     		this.cpiInput = driverInput;
     		
@@ -163,13 +164,13 @@
     	}
     	
     	@Override
-    	public void add(Datum d){
+    	public void add(Object d){
     		if (checkDelimiter(d))
     			throw new RuntimeException("Internal error: not expecting a delimiter tuple");
     	   int numItems = specs.size();
     	   
            // create one iterator per to-be-crossed bag
-           Iterator<Datum>[] its = new Iterator[numItems];
+           Iterator<Object>[] its = new Iterator[numItems];
            for (int i = 0; i < numItems; i++) {
         	   if (i != driver){
         		   its[i] = toBeCrossed[i].content();
@@ -178,8 +179,8 @@
         	   }
            }
 
-           Datum[] lastOutput = null;
-           Datum[] outData = new Datum[numItems];
+           Object[] lastOutput = null;
+           Object[] outData = new Object[numItems];
 
            boolean done = false;
            while (!done) {
@@ -219,16 +220,20 @@
                    }
                }
 
-               Tuple outTuple = new Tuple();
+               Tuple outTuple = mTupleFactory.newTuple();
                
                for (int i=0; i< numItems; i++){
             	   if (specs.get(i).isFlattened() && outData[i] instanceof Tuple){
         				Tuple t = (Tuple)outData[i];
-						for (int j=0; j < t.arity(); j++){
-		    			   outTuple.appendField(t.getField(j));
-		    			}
+                        try {
+						    for (int j=0; j < t.size(); j++){
+		    			        outTuple.append(t.get(j));
+		    			    }
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
         		   }else{
-            		   outTuple.appendField(outData[i]);
+            		   outTuple.append(outData[i]);
             	   }
                }
                successor.add(outTuple);
@@ -294,18 +299,6 @@
         }
     }
  
-    /**
-     * Determine if this instance of EvalItems is a candiate for algebraic
-     * evaluation. This means it contains an Algebraic Function, and does not
-     * contain repeated references to column used by the algebraic function.
-     * @return
-     */
-    @Override
-	public boolean amenableToCombiner() {
-    	//TODO
-        return false;
-    }
-
     @Override
 	public String toString() {
         StringBuffer sb = new StringBuffer();
@@ -373,4 +366,9 @@
 		return specs;
 	}
 
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitGenerate(this);
+	}
+    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java Tue Jan 22 13:17:12 2008
@@ -19,10 +19,9 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.pig.data.DataMap;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.AtomicDatum;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
 
@@ -33,22 +32,18 @@
 	 * 
 	 */
 	private static final long serialVersionUID = 1L;
-	protected AtomicDatum keyToLookup;
+	protected String keyToLookup;
 	
-	public MapLookupSpec(AtomicDatum keyToLookup){
+	public MapLookupSpec(String keyToLookup){
 		this.keyToLookup = keyToLookup;
 	}
 
 	@Override
-	protected Datum eval(Datum d) {
-		if (!(d instanceof DataMap))
-			throw new RuntimeException("Attempt to lookup on data of type " + d.getClass().getName());
-		return ((DataMap)d).get(keyToLookup);
-	}
-	
-	@Override
-	public boolean amenableToCombiner() {
-		return true;
+	protected Object eval(Object d) {
+		if (!(d instanceof Map))
+			throw new RuntimeException("Attempt to lookup on data of type "
+                + DataType.findTypeName(d));
+		return ((Map<Object, Object>)d).get(keyToLookup);
 	}
 	
 	@Override
@@ -67,10 +62,18 @@
 		StringBuilder sb = new StringBuilder();
 		sb.append("[");
 		sb.append("#'");
-		sb.append(keyToLookup.toString());
+		sb.append(keyToLookup);
 		sb.append("'");
 		sb.append("]");
 		return sb.toString();
 	}
+
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitMapLookup(this);
+	}
+
+	public String key() { return keyToLookup; }
+    
 	
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java Tue Jan 22 13:17:12 2008
@@ -21,8 +21,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
 
@@ -32,6 +32,8 @@
 
 	protected List<Integer> cols;
 	protected boolean wrapInTuple;
+
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 	
 
 	public List<Integer> getCols() {
@@ -48,11 +50,6 @@
 	}
 		
 	@Override
-	public boolean amenableToCombiner() {
-		return true;
-	}
-
-	@Override
 	public List<String> getFuncs() {
 		return new ArrayList<String>();
 	}
@@ -79,20 +76,25 @@
 	}
 	
 	@Override
-	protected Datum eval(Datum d){
+	protected Object eval(Object d){
 		if (!(d instanceof Tuple)){
 			throw new RuntimeException("Project operator expected a Tuple, found a " + d.getClass().getSimpleName());
 		}
 		Tuple t = (Tuple)d;
 		
-		if (!wrapInTuple && cols.size() == 1){
-			return t.getField(cols.get(0));
-		}else{
-			Tuple out = new Tuple();
-			for (int i: cols){
-				out.appendField(t.getField(i));
+		try{
+			if (!wrapInTuple && cols.size() == 1){
+				return t.get(cols.get(0));
+			}else{
+				Tuple out = mTupleFactory.newTuple(cols.size());
+				for (int i: cols){
+					out.set(i, t.get(i));
+				}
+				return out;
 			}
-			return out;
+		}catch (IOException e){
+			//TODO: Based on a strictness level, insert null values here
+				throw new RuntimeException(e);		
 		}
 	}
 
@@ -100,6 +102,8 @@
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append("[");
+		if (isFlattened)
+			sb.append("FLATTEN ");
 		sb.append("PROJECT ");
 		boolean first = true;
 		for (int i: cols){
@@ -133,5 +137,11 @@
 	public void setWrapInTuple(boolean wrapInTuple) {
 		this.wrapInTuple = wrapInTuple;
 	}
+
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitProject(this);
+	}
+    
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/SimpleEvalSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/SimpleEvalSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/SimpleEvalSpec.java Tue Jan 22 13:17:12 2008
@@ -15,34 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval;
-
-
-import org.apache.pig.data.Datum;
+package org.apache.pig.impl.eval;
+
+
 import org.apache.pig.impl.eval.collector.DataCollector;
 
-
-public abstract class SimpleEvalSpec extends EvalSpec {
-
-	@Override
-	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-		return new DataCollector(endOfPipe){
-			@Override
-			public void add(Datum d) {
-				if (checkDelimiter(d))
-					addToSuccessor(d);
-				else
-					addToSuccessor(eval(d));
-			}
-			
-			@Override
-			protected boolean needFlatteningLocally() {
-				return true;
-			}
-		};
-	}
-	
-	protected abstract Datum eval(Datum d);
-
-
-}
+
+public abstract class SimpleEvalSpec extends EvalSpec {
+
+	@Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+		return new DataCollector(endOfPipe){
+			@Override
+			public void add(Object d) {
+				if (checkDelimiter(d))
+					addToSuccessor(d);
+				else
+					addToSuccessor(eval(d));
+			}
+			
+			@Override
+			protected boolean needFlatteningLocally() {
+				return true;
+			}
+		};
+	}
+	
+	protected abstract Object eval(Object d);
+
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java Tue Jan 22 13:17:12 2008
@@ -23,115 +23,115 @@
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 
 public class SortDistinctSpec extends EvalSpec {
-	private static final long serialVersionUID = 1L;
-	transient DataBag bag;
-	protected EvalSpec sortSpec;
-	protected boolean eliminateDuplicates;
-	
-	
-	public SortDistinctSpec(boolean eliminateDuplicates, EvalSpec sortSpec){
-		this.eliminateDuplicates = eliminateDuplicates;
-		this.sortSpec = sortSpec;
-	}
-		
-	@Override
-	public boolean amenableToCombiner() {
-		//Combiner may potentially be useful if we are eliminating duplicates
-		return eliminateDuplicates;
-	}
-
-	@Override
-	public List<String> getFuncs() {
-		if (sortSpec!=null)
-			return sortSpec.getFuncs();
-		else
-			return new ArrayList<String>();
-	}
-
-	@Override
-	protected Schema mapInputSchema(Schema schema) {
-		return schema;
-	}
-
-	@Override
-	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-		return new DataCollector(endOfPipe){
-			
-			@Override
-			public void add(Datum d) {
-				if (inTheMiddleOfBag){
-					if (checkDelimiter(d)){
-						addToSuccessor(bag);
-					}else{
-						if (d instanceof Tuple){
-							bag.add((Tuple)d);
-						}else{
-							bag.add(new Tuple(d));
-						}
-					}
-				}else{
-					if (checkDelimiter(d)){
-						//Bag must have started now
-						try{
-							bag = BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
-							if (eliminateDuplicates)
-								bag.distinct();
-							else
-								bag.sort(sortSpec);
-							
-						}catch(IOException e){
-							throw new RuntimeException(e);
-						}
-					}else{
-						addToSuccessor(d);
-					}
-				}
-			}
-			
-			@Override
-			protected boolean needFlatteningLocally() {
-				return true;
-			}
-			
-			
-			@Override
-			protected void finish() {
-			
-				/*
-				 * To clear the temporary files if it was a big bag
-				 */
-				if (bag!=null)
-					bag.clear();
-			
-			}
-		};
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("[");
-		sb.append(eliminateDuplicates?"DISTINCT ":"SORT ");
-		if (sortSpec!=null)
-			sb.append(sortSpec.toString());
-		sb.append("]");
-		return sb.toString();
-	}
-
-	@Override
-	public void instantiateFunc(FunctionInstantiator instantiaor)
-			throws IOException {
-		if (sortSpec!=null)
-			sortSpec.instantiateFunc(instantiaor);		
-	}
-
-	
+    private static final long serialVersionUID = 1L;
+    transient DataBag bag;
+    protected EvalSpec sortSpec;
+    protected boolean eliminateDuplicates;
+    
+    
+    public SortDistinctSpec(boolean eliminateDuplicates, EvalSpec sortSpec){
+        this.eliminateDuplicates = eliminateDuplicates;
+        this.sortSpec = sortSpec;
+    }
+        
+    @Override
+    public List<String> getFuncs() {
+        if (sortSpec!=null)
+            return sortSpec.getFuncs();
+        else
+            return new ArrayList<String>();
+    }
+
+    @Override
+    protected Schema mapInputSchema(Schema schema) {
+        return schema;
+    }
+
+    @Override
+    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+        return new DataCollector(endOfPipe){
+
+            TupleFactory tfact = TupleFactory.getInstance();
+            
+            @Override
+            public void add(Object d) {
+                if (inTheMiddleOfBag){
+                    if (checkDelimiter(d)){
+                        addToSuccessor(bag);
+                    }else{
+                        if (d instanceof Tuple){
+                            bag.add((Tuple)d);
+                        }else{
+                            bag.add(tfact.newTuple(d));
+                        }
+                    }
+                }else{
+                    if (checkDelimiter(d)){
+                        //Bag must have started now
+                        if (eliminateDuplicates) {
+                            bag = BagFactory.getInstance().newDistinctBag();
+                        } else {
+                            bag = BagFactory.getInstance().newSortedBag(sortSpec);
+                        }
+                    }else{
+                        addToSuccessor(d);
+                    }
+                }
+            }
+            
+            @Override
+            protected boolean needFlatteningLocally() {
+                return true;
+            }
+            
+            
+            @Override
+            protected void finish() {
+            
+                /*
+                 * To clear the temporary files if it was a big bag
+                 */
+                if (bag!=null)
+                    bag.clear();
+            
+            }
+        };
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        sb.append(eliminateDuplicates?"DISTINCT ":"SORT ");
+        if (sortSpec!=null)
+            sb.append(sortSpec.toString());
+        sb.append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public void instantiateFunc(FunctionInstantiator instantiaor)
+            throws IOException {
+        if (sortSpec!=null)
+            sortSpec.instantiateFunc(instantiaor);        
+    }
+
+    @Override
+    public void visit(EvalSpecVisitor v) {
+        v.visitSortDistinct(this);
+    }
+
+    public EvalSpec getSortSpec() { return sortSpec; }
+
+    public boolean distinct() { return eliminateDuplicates; }
+    
+    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/StarSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/StarSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/StarSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/StarSpec.java Tue Jan 22 13:17:12 2008
@@ -47,10 +47,10 @@
     	return input;
     }
 
-    @Override
-    public boolean amenableToCombiner() {
-    	return true;
-    }
+	@Override
+	public void visit(EvalSpecVisitor v) {
+		v.visitStar(this);
+	}
     
     
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java Tue Jan 22 13:17:12 2008
@@ -20,7 +20,7 @@
 import java.util.Iterator;
 
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
+import org.apache.pig.data.DefaultAbstractBag;
 import org.apache.pig.data.Tuple;
 
 
@@ -43,7 +43,7 @@
 	/**
      * Add a tuple to the collector.
 	 */
-	public abstract void add(Datum d);
+	public abstract void add(Object d);
 	
 	private boolean needsFlattening(){
 		if (needFlatteningLocally() || (successor!=null && successor.needsFlattening()))
@@ -60,15 +60,15 @@
 	}
 	
 	
-	protected boolean checkDelimiter(Datum d){
-		if (d instanceof DataBag.BagDelimiterTuple){
-			if (d instanceof DataBag.StartBag){
+	protected boolean checkDelimiter(Object d){
+		if (d instanceof DefaultAbstractBag.BagDelimiterTuple){
+			if (d instanceof DefaultAbstractBag.StartBag){
 				if (inTheMiddleOfBag)
 					throw new RuntimeException("Internal error: Found a flattened bag inside another");
 				else
 					inTheMiddleOfBag = true;
 			}else{
-				if (!(d instanceof DataBag.EndBag))
+				if (!(d instanceof DefaultAbstractBag.EndBag))
 					throw new RuntimeException("Internal error: Unknown bag delimiter type");
 				if (!inTheMiddleOfBag)
 					throw new RuntimeException("Internal error: Improper nesting of bag delimiter tuples");
@@ -79,15 +79,15 @@
 		return false;
 	}
 
-	protected void addToSuccessor(Datum d){
+	protected void addToSuccessor(Object d){
 		if (d instanceof DataBag && !inTheMiddleOfBag && successor!=null && successor.needsFlattening()){
 			DataBag bag = (DataBag)d;
 			//flatten the bag and send it through the pipeline
-			successor.add(DataBag.startBag);
-		    Iterator<Datum> iter = bag.content();
+			successor.add(DefaultAbstractBag.startBag);
+		    Iterator<Tuple> iter = bag.iterator();
 	    	while(iter.hasNext())
 	    		successor.add(iter.next());
-	    	successor.add(DataBag.endBag);
+	    	successor.add(DefaultAbstractBag.endBag);
 		}else{
 			//simply add the datum
 			successor.add(d);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/FlattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/FlattenCollector.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/FlattenCollector.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/FlattenCollector.java Tue Jan 22 13:17:12 2008
@@ -15,28 +15,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval.collector;
-
-import org.apache.pig.data.Datum;
-
-public class FlattenCollector extends DataCollector {
-
-	public FlattenCollector(DataCollector successor){
-		super(successor);
-	}
-	
-	@Override
-	public void add(Datum d) {
-		if (checkDelimiter(d))
-			return;
-		else
-			successor.add(d);
-	}
-
-	@Override
-	protected boolean needFlatteningLocally() {
-		return true;
-	}
-	
-
-}
+package org.apache.pig.impl.eval.collector;
+
+public class FlattenCollector extends DataCollector {
+
+    public FlattenCollector(DataCollector successor){
+        super(successor);
+    }
+    
+    @Override
+    public void add(Object d) {
+        if (checkDelimiter(d))
+            return;
+        else
+            successor.add(d);
+    }
+
+    @Override
+    protected boolean needFlatteningLocally() {
+        return true;
+    }
+    
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java Tue Jan 22 13:17:12 2008
@@ -21,43 +21,41 @@
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 
 public class UnflattenCollector extends DataCollector {
-	DataBag bag;
-	
-	public UnflattenCollector(DataCollector successor){
-		super(successor);
-	}
-	
-	@Override
-	public void add(Datum d) {
-		if (inTheMiddleOfBag){
-			if (checkDelimiter(d)){
-				successor.add(bag);
-			}else{
-				if (d instanceof Tuple){
-					bag.add((Tuple)d);
-				}else{
-					bag.add(new Tuple(d));
-				}
-			}
-		}else{
-			if (checkDelimiter(d)){
-				//Bag must have started now
-				try{
-					bag = BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
-				}catch(IOException e){
-					throw new RuntimeException(e);
-				}
-			}else{
-				successor.add(d);
-			}
-		}
-	}
-	
-	
+    DataBag bag;
+    TupleFactory mTupleFactory;
+    
+    public UnflattenCollector(DataCollector successor){
+        super(successor);
+        mTupleFactory = TupleFactory.getInstance();
+    }
+    
+    @Override
+    public void add(Object d) {
+        if (inTheMiddleOfBag){
+            if (checkDelimiter(d)){
+                successor.add(bag);
+            }else{
+                if (d instanceof Tuple){
+                    bag.add((Tuple)d);
+                }else{
+                    bag.add(mTupleFactory.newTuple(d));
+                }
+            }
+        }else{
+            if (checkDelimiter(d)){
+                //Bag must have started now
+                bag = BagFactory.getInstance().newDefaultBag();
+            }else{
+                successor.add(d);
+            }
+        }
+    }
+    
+    
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/AndCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/AndCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/AndCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/AndCond.java Tue Jan 22 13:17:12 2008
@@ -22,21 +22,20 @@
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 
 
 
 public class AndCond extends Cond {
-	private static final long serialVersionUID = 1L;
-	public List<Cond> cList;
-	
-	public AndCond(List<Cond> cList) {
-		this.cList = cList;
-	}
-	
+    private static final long serialVersionUID = 1L;
+    public List<Cond> cList;
+    
+    public AndCond(List<Cond> cList) {
+        this.cList = cList;
+    }
+    
     @Override
-	public List<String> getFuncs() {
+    public List<String> getFuncs() {
         List<String> funcs = new ArrayList<String>();
         for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
             funcs.addAll(it.next().getFuncs());
@@ -45,7 +44,7 @@
     }
 
     @Override
-	public boolean eval(Datum input){
+    public boolean eval(Object input){
         for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
             if (it.next().eval(input) == false) return false;
         }
@@ -53,7 +52,7 @@
     }
     
     @Override
-	public String toString() {
+    public String toString() {
         StringBuffer sb = new StringBuffer();
         sb.append("(");
         for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
@@ -66,15 +65,15 @@
     
     @Override
     public void finish() {
-    	for (Cond c: cList)
-    		c.finish();
+        for (Cond c: cList)
+            c.finish();
     }
 
-	@Override
-	public void instantiateFunc(FunctionInstantiator instantiaor)
-			throws IOException {
-		for (Cond c: cList)
-    		c.instantiateFunc(instantiaor);
-		
-	}
+    @Override
+    public void instantiateFunc(FunctionInstantiator instantiaor)
+            throws IOException {
+        for (Cond c: cList)
+            c.instantiateFunc(instantiaor);
+        
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/CompCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/CompCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/CompCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/CompCond.java Tue Jan 22 13:17:12 2008
@@ -21,29 +21,27 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.EvalSpec;
 
 
 
 public class CompCond extends Cond {
-	private static final long serialVersionUID = 1L;
-	
+    private static final long serialVersionUID = 1L;
+    
     public String op;   // one of "<", ">", "==", etc.
     public EvalSpec left, right;
     
     
 
 
-	public CompCond(EvalSpec left, String op, EvalSpec right) throws IOException{
+    public CompCond(EvalSpec left, String op, EvalSpec right) throws IOException{
         this.op = op.toLowerCase();
         this.left = left;
         this.right = right;
         
         if (left.isAsynchronous() || right.isAsynchronous()){
-        	throw new IOException("Can't compare the output of an asynchronous function");
+            throw new IOException("Can't compare the output of an asynchronous function");
         }
     } 
    
@@ -53,19 +51,23 @@
     }
 
     @Override
-    public boolean eval(Datum input) {
-    	
-    	Datum d1 = left.simpleEval(input);
-    	Datum d2 = right.simpleEval(input);
-    	
-    	if (!(d1 instanceof DataAtom) || !(d2 instanceof DataAtom)){
-    		throw new RuntimeException("Builtin functions cannot be used to compare non-atomic values. Use a filter function instead.");
-    	}
-    	
-    	DataAtom da1 = (DataAtom)d1;
-    	DataAtom da2 = (DataAtom)d2;
-    	
-    	
+    public boolean eval(Object input) {
+
+        // This is going to be totally rewritten, so don't mess with it.
+        return false;
+        
+        /*
+        Object d1 = left.simpleEval(input);
+        Object d2 = right.simpleEval(input);
+        
+        if (DataType.isCompex(d1) || DataType.isCompex(d2)) {
+            throw new RuntimeException("Builtin functions cannot be used to compare non-atomic values. Use a filter function instead.");
+        }
+        
+        DataAtom da1 = (DataAtom)d1;
+        DataAtom da2 = (DataAtom)d2;
+        
+        
         char op1 = op.charAt(0);
         char op2 = op.length() >= 2 ? op.charAt(1) : '0';
         char op3 = op.length() == 3 ? op.charAt(2) : '0';
@@ -124,24 +126,25 @@
         default:
             throw new RuntimeException("Internal error: Invalid filter operator: " + op);
         }
+        */
     }
         
     @Override
-	public String toString() {
+    public String toString() {
         return "(" + left + " " + op + " " + right + ")";
     }
     
     @Override
     public void finish() {
-    	left.finish();
-    	right.finish();
+        left.finish();
+        right.finish();
     }
 
 
-	@Override
-	public void instantiateFunc(FunctionInstantiator instantiaor)
-			throws IOException {
-    	left.instantiateFunc(instantiaor);
-    	right.instantiateFunc(instantiaor);		
-	}
+    @Override
+    public void instantiateFunc(FunctionInstantiator instantiaor)
+            throws IOException {
+        left.instantiateFunc(instantiaor);
+        right.instantiateFunc(instantiaor);        
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/Cond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/Cond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/Cond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/Cond.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,6 @@
 import java.io.Serializable;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 
 
@@ -30,7 +29,7 @@
 
 	public abstract List<String> getFuncs();
     
-    public abstract boolean eval(Datum input);
+    public abstract boolean eval(Object input);
     
     public abstract void finish();
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FalseCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FalseCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FalseCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FalseCond.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 
 
@@ -30,7 +29,7 @@
 	private static final long serialVersionUID = 1L;
 
     @Override
-    public boolean eval(Datum input) {
+    public boolean eval(Object input) {
         return false;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FuncCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FuncCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FuncCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/FuncCond.java Tue Jan 22 13:17:12 2008
@@ -22,7 +22,6 @@
 import java.util.List;
 
 import org.apache.pig.FilterFunc;
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.EvalSpec;
@@ -60,10 +59,10 @@
     }
 
     @Override
-    public boolean eval(Datum input){
+    public boolean eval(Object input){
     	try {
         	
-        	Datum d = null;
+        	Object d = null;
         	if (args!=null)
         		d = args.simpleEval(input);
         	

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/NotCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/NotCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/NotCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/NotCond.java Tue Jan 22 13:17:12 2008
@@ -20,7 +20,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 
 
@@ -40,7 +39,7 @@
     }
 
     @Override
-	public boolean eval(Datum input){
+	public boolean eval(Object input){
         return !cond.eval(input);
     }
     

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/OrCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/OrCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/OrCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/OrCond.java Tue Jan 22 13:17:12 2008
@@ -22,7 +22,6 @@
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 
 
@@ -47,7 +46,7 @@
     }
     
     @Override
-	public boolean eval(Datum input){
+	public boolean eval(Object input){
         for (Iterator<Cond> it = cList.iterator(); it.hasNext(); ) {
             if (it.next().eval(input) == true) return true;
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/RegexpCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/RegexpCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/RegexpCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/RegexpCond.java Tue Jan 22 13:17:12 2008
@@ -21,8 +21,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.data.DataAtom;
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.EvalSpec;
 
@@ -48,14 +46,17 @@
     }
 
     @Override
-    public boolean eval(Datum input){
+    public boolean eval(Object input){
     	
+        return false;
+        /*
     	Datum d = left.simpleEval(input);
     	
     	if (!(d instanceof DataAtom))
     		throw new RuntimeException("Cannot match non-atomic value against a regular expression. Use a filter function instead.");
 
     	return ((DataAtom)d).strval().matches(re);
+        */
     }
       
     @Override

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/TrueCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/TrueCond.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/TrueCond.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/cond/TrueCond.java Tue Jan 22 13:17:12 2008
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.impl.FunctionInstantiator;
 
 
@@ -29,7 +28,7 @@
 	private static final long serialVersionUID = 1L;
 
     @Override
-    public boolean eval(Datum input) {
+    public boolean eval(Object input) {
         return true;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java Tue Jan 22 13:17:12 2008
@@ -15,75 +15,79 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval.window;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+package org.apache.pig.impl.eval.window;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.TimestampedTuple;
 import org.apache.pig.impl.eval.collector.DataCollector;
-
+import org.apache.pig.impl.eval.EvalSpecVisitor;
+
+
 
-
-public class TimeWindowSpec extends WindowSpec {
-	private static final long serialVersionUID = 1L;
-	double duration;  // duration in seconds
-	transient List<TimestampedTuple> window;
-        
-	public TimeWindowSpec(windowType type, double duration){
-		super(type);
-		this.duration = duration;
-        window = new LinkedList<TimestampedTuple>();
+public class TimeWindowSpec extends WindowSpec {
+    private static final long serialVersionUID = 1L;
+    double duration;  // duration in seconds
+    transient List<TimestampedTuple> window;
+        
+    public TimeWindowSpec(windowType type, double duration){
+        super(type);
+        this.duration = duration;
+        window = new LinkedList<TimestampedTuple>();
     }
-	
+    
     @Override
-	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-        return new DataCollector(endOfPipe) {
-
+    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+        return new DataCollector(endOfPipe) {
+
             @Override
-			public void add(Datum d){
-                
-                boolean changed = false;
-                
-                if (!(((TimestampedTuple) d).isHeartBeat())) {
-                    window.add((TimestampedTuple) d);
-                    changed = true;
-                }
-
-                double expireTime = ((TimestampedTuple) d).getTimeStamp() - duration;
-                while (true) {
-                    TimestampedTuple tail = window.get(0);
-                    
-                    if (tail != null & tail.getTimeStamp()<= expireTime) {
-                        window.remove(0);
-                        changed = true;
-                    } else {
-                        break;
-                    }
-                }
-                
-                if (changed) {
-                    // emit entire window content to output collector
-                    for (Iterator<TimestampedTuple> it = window.iterator(); it.hasNext(); ) {
-                        successor.add(it.next());
-                    }
-                }
+            public void add(Object d){
+                
+                boolean changed = false;
+                
+                if (!(((TimestampedTuple) d).isHeartBeat())) {
+                    window.add((TimestampedTuple) d);
+                    changed = true;
+                }
+
+                double expireTime = ((TimestampedTuple) d).getTimeStamp() - duration;
+                while (true) {
+                    TimestampedTuple tail = window.get(0);
+                    
+                    if (tail != null & tail.getTimeStamp()<= expireTime) {
+                        window.remove(0);
+                        changed = true;
+                    } else {
+                        break;
+                    }
+                }
+                
+                if (changed) {
+                    // emit entire window content to output collector
+                    for (Iterator<TimestampedTuple> it = window.iterator(); it.hasNext(); ) {
+                        successor.add(it.next());
+                    }
+                }
             }
             
-            
-        };
+            
+        };
     }
     
     @Override
     public String toString() {
-    	StringBuilder sb = new StringBuilder();
-    	sb.append("[WINDOW ");
-    	sb.append(type);
-    	sb.append(" TIME ");
-    	sb.append(duration);
-    	sb.append("]");
-    	return sb.toString();
-    }
-}
+        StringBuilder sb = new StringBuilder();
+        sb.append("[WINDOW ");
+        sb.append(type);
+        sb.append(" TIME ");
+        sb.append(duration);
+        sb.append("]");
+        return sb.toString();
+    }
+
+    public void visit(EvalSpecVisitor v) {
+        v.visitTimeWindow(this);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java Tue Jan 22 13:17:12 2008
@@ -15,52 +15,52 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval.window;
-
-import java.util.*;
+package org.apache.pig.impl.eval.window;
+
+import java.util.*;
 
-import org.apache.pig.data.Datum;
 import org.apache.pig.data.TimestampedTuple;
 import org.apache.pig.impl.eval.collector.DataCollector;
-
-public class TupleWindowSpec extends WindowSpec {
+import org.apache.pig.impl.eval.EvalSpecVisitor;
+
+public class TupleWindowSpec extends WindowSpec {
 	private static final long serialVersionUID = 1L;
 	   
-	int numTuples;
-    transient List<Datum> window;
-	
-	public TupleWindowSpec(windowType type, int numTuples){
-		super(type);
-		this.numTuples = numTuples;
-        window = new LinkedList<Datum>();
-	}
-	
+	int numTuples;
+    transient List<Object> window;
+	
+	public TupleWindowSpec(windowType type, int numTuples){
+		super(type);
+		this.numTuples = numTuples;
+        window = new LinkedList<Object>();
+	}
+	
 	@Override
 	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-	    return new DataCollector(endOfPipe) {
-
+	    return new DataCollector(endOfPipe) {
+
 	        @Override
-			public void add(Datum d) {
-	        	if (d!=null){
-	        	
-		        	if (d instanceof TimestampedTuple){
-		        		if (((TimestampedTuple) d).isHeartBeat()) return;
-		        	}
-		        	
-	                window.add(d);
-	                while (window.size() > numTuples) {
-	                    window.remove(0);
-	                }
-	        	}
-	        	
-	        	// emit entire window content to output collector
-                for (Iterator<Datum> it = window.iterator(); it.hasNext(); ) {
-                    addToSuccessor(it.next());
-                }
+			public void add(Object d) {
+	        	if (d!=null){
+	        	
+		        	if (d instanceof TimestampedTuple){
+		        		if (((TimestampedTuple) d).isHeartBeat()) return;
+		        	}
+		        	
+	                window.add(d);
+	                while (window.size() > numTuples) {
+	                    window.remove(0);
+	                }
+	        	}
+	        	
+	        	// emit entire window content to output collector
+                for (Iterator<Object> it = window.iterator(); it.hasNext(); ) {
+                    addToSuccessor(it.next());
+                }
 	        }
 	        
-	       
-	    };
+	       
+	    };
     }
 	
     @Override
@@ -73,5 +73,9 @@
     	sb.append("]");
     	return sb.toString();
     }
-
-}
+
+    public void visit(EvalSpecVisitor v) {
+        v.visitTupleWindow(this);
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/WindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/WindowSpec.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/WindowSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/window/WindowSpec.java Tue Jan 22 13:17:12 2008
@@ -42,10 +42,5 @@
 		return input;
 	}
 	
-	@Override
-	public boolean amenableToCombiner() {
-		return false;
-	}
-	
 	
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=614325&r1=614324&r2=614325&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Tue Jan 22 13:17:12 2008
@@ -42,6 +42,13 @@
         pos++;
         return c;
 	}
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+        int read = in.read(b, off, len);
+        pos += read;
+        return read;
+    }
 	
     @Override
 	public long skip(long n) throws IOException {



Mime
View raw message