pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r591143 [2/4] - in /incubator/pig/branches/types: src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/eval/...
Date Thu, 01 Nov 2007 20:48:22 GMT
Modified: incubator/pig/branches/types/src/org/apache/pig/data/Datum.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/Datum.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/Datum.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/Datum.java Thu Nov  1 13:48:16 2007
@@ -18,24 +18,181 @@
 package org.apache.pig.data;
 
 import java.io.DataOutput;
+import java.io.DataInput;
 import java.io.IOException;
 
 /**
  * A marker class for a basic data unit.
  */
-public abstract class Datum implements Comparable {
-	public static final byte ATOM   = 0x50;
-    public static final byte BAG    = 0x51;
-    public static final byte TUPLE  = 0x60;
-    public static final byte MAP  = 0x52;
-    public static final byte RECORD_1 = 0x21;
-    public static final byte RECORD_2 = 0x31;
-    public static final byte RECORD_3 = 0x41;
+public interface Datum extends Comparable {
 
+enum DataType {
+	BAG((byte)0x7f, (byte)1), // DEL
+	TUPLE((byte)0x7e, (byte)2), // ~
+	MAP((byte)0x7d, (byte)3), // }
+	INT((byte)0x7c, (byte)4), // |
+	LONG((byte)0x7b, (byte)5), // {
+	FLOAT((byte)0x7a, (byte)6), // z
+	DOUBLE((byte)0x79, (byte)7), // y
+	CHARARRAY((byte)0x78, (byte)8), // x
+	UNKNOWN((byte)0x77, (byte)9), // w
+	ATOM((byte)0x76, (byte)10); // v
+
+	DataType(byte m, byte v) { marker = m; val = v; }
+
+	private byte marker;
+	private byte val;
+
+	public byte getMarker() { return marker; }
+
+	static DataType markerToType(byte marker) {
+		switch (marker) {
+		case 0x7f: return BAG;
+		case 0x7e: return TUPLE;
+		case 0x7d: return MAP;
+		case 0x7c: return INT;
+		case 0x7b: return LONG;
+		case 0x7a: return FLOAT;
+		case 0x79: return DOUBLE;
+		case 0x78: return CHARARRAY;
+		case 0x77: return UNKNOWN;
+		case 0x76: return ATOM;
+		default: throw new AssertionError("Unknown datatype marker " + marker);
+		}
+	}
+};
+
+enum DataDimension { ATOMIC, COMPLEX, UNKNOWN };
+
+// Codes for writing out data.  Order is important here.
+/*
+public static final byte NULL      = 0x1;
+public static final byte BAG       = 0x51;
+public static final byte TUPLE     = 0x60;
+public static final byte MAP       = 0x52;
+public static final byte INT       = 0x53;
+public static final byte LONG      = 0x54;
+public static final byte FLOAT     = 0x55;
+public static final byte DOUBLE    = 0x56;
+public static final byte CHARARRAY = 0x57;
+public static final byte UNKNOWN   = 0x58;
+public static final byte RECORD_1  = 0x21;
+public static final byte RECORD_2  = 0x31;
+public static final byte RECORD_3  = 0x41;
+public static final byte ATOM      = CHARARRAY;
+*/
+
+
+/**
+ * Encode the integer so that the high bit is set on the last
+ * byte.
+ * @param DataOutput to write to
+ * @param int value to write
+ * @throws IOException
+ */
+/*
+static void encodeInt(DataOutput os, int i) throws IOException 
+{
+	if (i >> 28 != 0) os.write((i >> 28) & 0x7f);
+	if (i >> 21 != 0) os.write((i >> 21) & 0x7f);
+	if (i >> 14 != 0) os.write((i >> 14) & 0x7f);
+	if (i >> 7 != 0) os.write((i >> 7) & 0x7f);
+	os.write((i & 0x7f) | (1 << 7));
+}
+*/
+
+/**
+ * Decode an integer to be read out of a file
+ * @param is DataInput to read from
+ * @return decoded integer
+ */
+/*
+static int decodeInt(DataInput is) throws IOException
+{
+	int i = 0;
+	int c;
+	while (true) {
+		c = is.readUnsignedByte();
+		if (c == -1) break;
+		i <<= 7;
+		i += c & 0x7f;
+		if ((c & 0x80) != 0) break;
+	}
+	return i;
+}
+*/
+
+/**
+ * Read a datum from a file. This is used for reading intermediate
+ * results.  Public so it can be used in unit tests.
+ * @param in DataInput source.
+ * @return a new Datum.
+ * @throws IOException if one of the extensions' constructors throws it.
+ */
+/*
+public static Datum readDatum(DataInput in) throws IOException
+{
+	byte b = in.readByte();
+	DataType t = DataType.markerToType(b);
+	switch (t) {
+	case BAG: return DataBag.read(in);
+	case TUPLE: return Tuple.read(in);
+	case MAP: return DataMap.read(in);
+	case INT: return DataInteger.read(in);
+	case LONG: return DataLong.read(in);
+	case FLOAT: return DataFloat.read(in);
+	case DOUBLE: return DataDouble.read(in);
+	case CHARARRAY: return DataCharArray.read(in);
+	case UNKNOWN: return DataUnknown.read(in);
+	default:
+		throw new AssertionError("Invalid data type indicator " + b + " while reading Datum from binary file");
+	}
+}
+*/
+
+
+	/*
 	@Override
 	public abstract boolean equals(Object o);
+	*/
+
+/**
+ * Get the datatype of this datum.
+ * @return type of this datum.
+ */
+DataType getType();
 	
-	public abstract void write(DataOutput out) throws IOException;
+/**
+ * Find the size of this datum.
+ * @return size
+ */
+long size();
+
+/**
+ * Find out if this is an atomic or complex data type.  It can also be
+ * unknown.
+ * @return ATOMIC if this is int, long, float, double, or chararray,
+ * COMPLEX if it is a tuple, bag, or map, and UNKNOWN if it's unknown.
+ */
+DataDimension getDimension();
+
+/**
+ * Find out if this datum is null.
+ * @return true if this datum is null, false otherwise.
+ */
+boolean isNull();
+
+/**
+ * Set this datum as null or not null.
+ * @param isnull if true, datum will be set to null
+ */
+void setNull(boolean isNull);
+	
+/**
+ * Write contained data to an output.
+ * @param out DataOutput to write to.
+ * @throws IOException
+ */
+void write(DataOutput out) throws IOException;
 	
-	     
 }

Added: incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.IOException;
+
+/**
+ * A marker class for a basic data unit.
+ */
+public abstract class DatumImpl implements Datum {
+
+
+/**
+ * Read a datum from a file. This is used for reading intermediate
+ * results.  Public so it can be used in unit tests.
+ * @param in DataInput source.
+ * @return a new Datum.
+ * @throws IOException if one of the extensions' constructors throws it.
+ */
+public static Datum readDatum(DataInput in) throws IOException
+{
+	byte b = in.readByte();
+	DataType t = DataType.markerToType(b);
+	switch (t) {
+	case BAG: return DataBag.read(in);
+	case TUPLE: return Tuple.read(in);
+	case MAP: return DataMap.read(in);
+	case INT: return DataInteger.read(in);
+	case LONG: return DataLong.read(in);
+	case FLOAT: return DataFloat.read(in);
+	case DOUBLE: return DataDouble.read(in);
+	case CHARARRAY: return DataCharArray.read(in);
+	case UNKNOWN: return DataUnknown.read(in);
+	case ATOM: return DataAtom.read(in);
+	default:
+		throw new AssertionError("Invalid data type indicator " + b + " while reading Datum from binary file");
+	}
+}
+
+
+/**
+ * Find out if this datum is null.
+ * @return true if this datum is null, false otherwise.
+ */
+public final boolean isNull() { return mNull; }
+
+/**
+ * Set this datum as null or not null.
+ * @param isnull if true, datum will be set to null
+ */
+public final void setNull(boolean isNull) { mNull = isNull; }
+
+/**
+ * Handle intertype sorting, so that it is consistent.  It is assumed the two
+ * objects are not of the same type.
+ * @param other Datum to compare to.
+ * @return a -1 if this object's datatype is less than other's datatype, +1
+ * if this object's datatype is greater than other's datatype. 
+ */
+protected int crossTypeCompare(Datum other)
+{
+	return getType().compareTo(other.getType());
+}
+	
+// Indicates whether this datum is null.
+private boolean mNull = false;
+	     
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Thu Nov  1 13:48:16 2007
@@ -32,7 +32,7 @@
 	}
 	
 	public IndexedTuple(Tuple t, int indexIn) {
-		fields = t.fields;
+		super(t);
 		index = indexIn;
 	}
 
@@ -45,17 +45,19 @@
 	@Override
 	public void write(DataOutput out) throws IOException {
 		super.write(out);
-		encodeInt(out, index);
+		out.writeInt(index);
+		//encodeInt(out, index);
 	}
 	@Override
 	public void readFields(DataInput in) throws IOException {
 		super.readFields(in);
-		index = decodeInt(in);
+		index = in.readInt();
+		//index = decodeInt(in);
 	}
 	
 	public Tuple toTuple(){
 		Tuple t = new Tuple();
-		t.fields = fields;
+		t.mFields = mFields;
 		return t;
 	}
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java Thu Nov  1 13:48:16 2007
@@ -48,7 +48,7 @@
             delimiter = defaultDelimiter;
         }
         String[] splitString = textLine.split(delimiter, -1);
-        fields = new ArrayList<Datum>(splitString.length-1);
+        mFields = new ArrayList<Datum>(splitString.length-1);
         for (int i = 0; i < splitString.length; i++) {
         	if (i==timestampColumn){
         		try{
@@ -57,7 +57,7 @@
         			System.err.println("Could not parse timestamp " + splitString[i]);
         		}
         	}else{
-        		fields.add(new DataAtom(splitString[i]));
+        		mFields.add(new DataAtom(splitString[i]));
         	}
         }
     }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java Thu Nov  1 13:48:16 2007
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,337 +24,561 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.log4j.Logger;
+
 import org.apache.hadoop.io.WritableComparable;
 
+import org.apache.pig.impl.util.PigLogger;
+
+/**
+ * an ordered list of Data
+ */
+public class Tuple extends ComplexDatum implements WritableComparable
+{
+
+static String			  defaultDelimiter = "[,\t]";
+static String			  NULL = "__PIG_NULL__";
+
+public static final byte RECORD_1  = 0x21;
+public static final byte RECORD_2  = 0x31;
+public static final byte RECORD_3  = 0x41;
+
+/**
+ * Construct a tuple with no fields.
+ */
+public Tuple() { this(0); }
+
+/**
+ * Construct a tuple with a known number of fields.  The fields will be
+ * pre-populated with nulls.
+ * @param numFields Number of fields in the tuple.
+ */
+public Tuple(long numFields)
+{
+	this(numFields, true);
+}
+
+/**
+ * Construct a tuple with a knwn number of fields.
+ * @param numFields Number of fields in the tuple.
+ * @param prepopulate If true, prepopulate with nulls, otherwise leave the
+ * tuple empty.  Should be called with false only by Tuple.read()
+ */
+private Tuple(long numFields, boolean prepopulate)
+{
+	mFields = new ArrayList<Datum>((int)numFields);
+	if (prepopulate) {
+		for (int i = 0; i < numFields; i++) {
+			mFields.add(null);
+		}
+	}
+}
+
+/**
+ * Construct a tuple from an existing List.
+ */
+public Tuple(List<Datum> mFieldsIn)
+{
+	mFields = new ArrayList<Datum>(mFieldsIn.size());
+	mFields.addAll(mFieldsIn);
+}
+
 /**
- * an ordered list of Datums
+ * Construct a tuple from an existing tuple.  The fields are not copied,
+ * only referenced.
  */
-public class Tuple extends Datum implements WritableComparable {
-    protected ArrayList<Datum> fields;
-    static String              defaultDelimiter = "[,\t]";
-    static String              NULL = "__PIG_NULL__";
-
-    public Tuple() {
-        this(0);
-    }
-
-    public Tuple(int numFields) {
-        fields = new ArrayList<Datum>(numFields);
-        for (int i = 0; i < numFields; i++) {
-            fields.add(null);
-        }
-    }
-
-    public Tuple(List<Datum> fieldsIn) {
-        fields = new ArrayList<Datum>(fieldsIn.size());
-        fields.addAll(fieldsIn);
-    }
-    
-    /**
-     * shortcut, if tuple only has one field
-     */
-    public Tuple(Datum fieldIn) {
-        fields = new ArrayList<Datum>(1);
-        fields.add(fieldIn);
-    }
-
-    /**
-     * Creates a tuple from a delimited line of text
-     * 
-     * @param textLine
-     *            the line containing fields of data
-     * @param delimiter
-     *            a regular expression of the form specified by String.split(). If null, the default
-     *            delimiter "[,\t]" will be used.
-     */
-    public Tuple(String textLine, String delimiter) {
-        if (delimiter == null) {
-            delimiter = defaultDelimiter;
-        }
-        String[] splitString = textLine.split(delimiter, -1);
-        fields = new ArrayList<Datum>(splitString.length);
-        for (int i = 0; i < splitString.length; i++) {
-            fields.add(new DataAtom(splitString[i]));
-        }
-    }
-
-    /**
-     * Creates a tuple from a delimited line of text. This will invoke Tuple(textLine, null)
-     * 
-     * @param textLine
-     *            the line containing fields of data
-     */
-    public Tuple(String textLine) {
-        this(textLine, defaultDelimiter);
-    }
-
-    public Tuple(Tuple[] otherTs) {
-        fields = new ArrayList<Datum>(otherTs.length);
-        for (int i = 0; i < otherTs.length; i++) {
-                appendTuple(otherTs[i]);
-        }
-    }
-
-    public void copyFrom(Tuple otherT) {
-        this.fields = otherT.fields;
-    }
-
-    public int arity() {
-        return fields.size();
-    }
-
-    @Override
-	public String toString() {
-    	StringBuffer sb = new StringBuffer();
-        sb.append('(');
-        for (Iterator<Datum> it = fields.iterator(); it.hasNext();) {
-            Datum d = it.next();
-            if(d != null) {
-                sb.append(d.toString());
-            } else {
-                sb.append(NULL);
-            }
-            if (it.hasNext())
-                sb.append(", ");
-        }
-        sb.append(')');
-        String s = sb.toString();
-        return s;
-    }
-
-    public void setField(int i, Datum val) throws IOException {
-        getField(i); // throws exception if field doesn't exist
-
-        fields.set(i, val);
-    }
-
-    public void setField(int i, int val) throws IOException {
-        setField(i, new DataAtom(val));
-    }
-
-    public void setField(int i, double val) throws IOException {
-        setField(i, new DataAtom(val));
-    }
-
-    public void setField(int i, String val) throws IOException {
-        setField(i, new DataAtom(val));
-    }
-
-    public Datum getField(int i) throws IOException {
-        if (fields.size() >= i + 1)
-            return fields.get(i);
-        else
-            throw new IOException("Column number out of range: " + i + " -- " + toString());
-    }
-
-    // Get field i, if it is an Atom or can be coerced into an Atom
-    public DataAtom getAtomField(int i) throws IOException {
-        Datum field = getField(i); // throws exception if field doesn't exist
-
-        if (field instanceof DataAtom) {
-            return (DataAtom) field;
-        } else if (field instanceof Tuple) {
-            Tuple t = (Tuple) field;
-            if (t.arity() == 1) {
-            	System.err.println("Warning: Asked for an atom field but found a tuple with one field.");
-                return t.getAtomField(0);
-            }
-        } else if (field instanceof DataBag) {
-            DataBag b = (DataBag) field;
-            if (b.cardinality() == 1) {
-                Tuple t = b.content().next();
-                if (t.arity() == 1) {
-                    return t.getAtomField(0);
-                }
-            }
-        }
-
-        throw new IOException("Incompatible type for request getAtomField().");
-    }
-
-    // Get field i, if it is a Tuple or can be coerced into a Tuple
-    public Tuple getTupleField(int i) throws IOException {
-        Datum field = getField(i); // throws exception if field doesn't exist
-
-        if (field instanceof Tuple) {
-            return (Tuple) field;
-        } else if (field instanceof DataBag) {
-            DataBag b = (DataBag) field;
-            if (b.cardinality() == 1) {
-                return b.content().next();
-            }
-        }
-
-        throw new IOException("Incompatible type for request getTupleField().");
-    }
-
-    // Get field i, if it is a Bag or can be coerced into a Bag
-    public DataBag getBagField(int i) throws IOException {
-        Datum field = getField(i); // throws exception if field doesn't exist
-
-        if (field instanceof DataBag) {
-            return (DataBag) field;
-        }
-
-        throw new IOException("Incompatible type for request getBagField().");
-    }
-
-    public void appendTuple(Tuple other){
-        for (Iterator<Datum> it = other.fields.iterator(); it.hasNext();) {
-            this.fields.add(it.next());
-        }
-    }
-
-    public void appendField(Datum newField){
-        this.fields.add(newField);
-    }
-
-    public String toDelimitedString(String delim) throws IOException {
-        StringBuffer buf = new StringBuffer();
-        for (Iterator<Datum> it = fields.iterator(); it.hasNext();) {
-            Datum field = it.next();
-            if (!(field instanceof DataAtom)) {
-                throw new IOException("Unable to convert non-flat tuple to string.");
-            }
-
-            buf.append((DataAtom) field);
-            if (it.hasNext())
-                buf.append(delim);
-        }
-        return buf.toString();
-    }
-
-    public boolean lessThan(Tuple other) {
-        return (this.compareTo(other) < 0);
-    }
-
-    public boolean greaterThan(Tuple other) {
-        return (this.compareTo(other) > 0);
-    }
-    
-    @Override
-	public boolean equals(Object other){
-    		return compareTo(other)==0;
-    }	
-    
-    public int compareTo(Tuple other) {
-        if (other.fields.size() != this.fields.size())
-            return other.fields.size() < this.fields.size() ? 1 : -1;
-
-        for (int i = 0; i < this.fields.size(); i++) {
-            int c = this.fields.get(i).compareTo(other.fields.get(i));
-            if (c != 0)
-                return c;
-        }
-        return 0;
-    }
-
-    public int compareTo(Object other) {
-        if (other instanceof DataAtom)
-            return +1;
-        else if (other instanceof DataBag)
-            return +1;
-        else if (other instanceof Tuple)
-            return compareTo((Tuple) other);
-        else
-        	return -1;
-    }
-
-    @Override
-	public int hashCode() {
-        int hash = 1;
-        for (Iterator<Datum> it = fields.iterator(); it.hasNext();) {
-            hash = 31 * hash + it.next().hashCode();
-        }
-        return hash;
-    }
+public Tuple(Tuple t)
+{
+	mFields = new ArrayList<Datum>(t.mFields.size());
+	mFields.addAll(t.mFields);
+}
+	
+/**
+ * shortcut, if tuple only has one field
+ */
+public Tuple(Datum fieldIn)
+{
+	mFields = new ArrayList<Datum>(1);
+	mFields.add(fieldIn);
+}
 
-    // WritableComparable methods:
+/**
+ * Creates a tuple from a delimited line of text.  For now creates elements as
+ * DataAtoms.  This should change once we have expressions that can handle the
+ * new types.
+ * 
+ * @param textLine
+ *			the line containing mFields of data
+ * @param delimiter
+ *			a regular expression of the form specified by String.split(). If null, the default
+ *			delimiter "[,\t]" will be used.
+ */
+public Tuple(String textLine, String delimiter)
+{
+	if (delimiter == null) {
+		delimiter = defaultDelimiter;
+	}
+	String[] splitString = textLine.split(delimiter, -1);
+	mFields = new ArrayList<Datum>(splitString.length);
+	for (int i = 0; i < splitString.length; i++) {
+		mFields.add(new DataAtom(splitString[i]));
+	}
+}
+
+/**
+ * Creates a tuple from a delimited line of text. This will invoke Tuple(textLine, null)
+ * 
+ * @param textLine
+ *			the line containing mFields of data
+ */
+public Tuple(String textLine) { this(textLine, defaultDelimiter); }
+
+public Tuple(Tuple[] otherTs)
+{
+	mFields = new ArrayList<Datum>(otherTs.length);
+	for (int i = 0; i < otherTs.length; i++) {
+			appendTuple(otherTs[i]);
+	}
+}
+
+public DataType getType() { return Datum.DataType.TUPLE; }
+
+public long size() { return mFields.size(); }
+
+public void copyFrom(Tuple otherT)
+{
+	this.mFields = otherT.mFields;
+}
+
+/**
+ * @deprecated Using size instead.
+ */
+public int arity() { return (int)size(); }
+
+@Override
+public String toString()
+{
+	StringBuffer sb = new StringBuffer();
+	sb.append('(');
+	for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
+		Datum d = it.next();
+		if(d != null) {
+			sb.append(d.toString());
+		} else {
+			sb.append(NULL);
+		}
+		if (it.hasNext())
+			sb.append(", ");
+	}
+	sb.append(')');
+	String s = sb.toString();
+	return s;
+}
+
+public final void setField(long i, Datum val) throws IOException
+{
+	if (i >= mFields.size()) {
+		throw new IOException("Column number out of range, tried to access " + i + " in a tuple of only " + mFields.size() + "columns");
+	}
+	mFields.set((int)i, val);
+}
+
+/**
+ * Set a field with an int value. Push it into a DataAtom for the moment,
+ * eventually we'll change this to a DataInteger.
+ * @param val integer value to set field to.
+ */
+public final void setField(int i, int val) throws IOException
+{
+	setField(i, new DataAtom(val));
+}
+
+/**
+ * Set a field with a double value. Push it into a DataAtom for the moment,
+ * eventually we'll change this to a DataDouble.
+ * @param val double value to set field to.
+ */
+public void setField(int i, double val) throws IOException
+{
+	setField(i, new DataAtom(val));
+}
+
+/**
+ * Set a field with an string value. Push it into a DataAtom for the moment,
+ * eventually we'll change this to a DataCharArrayUtf16.
+ * @param val string value to set field to.
+ */
+public void setField(int i, String val) throws IOException
+{
+	setField(i, new DataAtom(val));
+}
+
+/**
+ * Get a field from the tuple.
+ * @param i Field to get
+ * @return Field value, as Datum.
+ */
+public final Datum getField(long i)
+{
+	Datum d = null;
+	if ((i >= mFields.size()) || ((d = mFields.get((int)i)) == null)) {
+		d = new DataUnknown();
+		d.setNull(true);
+	}
+	return d;
+}
+
+/**
+ * @deprecated Use specific types instead of DataAtom.
+ * Get field i, if it is an Atom or can be coerced into an Atom.
+ * @param i field to get as an atom.
+ * @return contents of the field. If its of type DataAtom then that will
+ * be returned.  If it's of tuple of one field, then that field will be
+ * returned.  If it's a bag of one element, then that element will be
+ * returned.  If it's one of the new atomic types (int, etc.) it will push
+ * that into a data atom and return that.
+ * @throws IOException if the field isn't an atom and it can't figure out
+ * how to do the coercion.
+ */
+public DataAtom getAtomField(int i) throws IOException
+{
+	Datum field = getField(i); // throws exception if field doesn't exist
+
+	// This shouldn't actually ever happen anymore.
+	if (field instanceof DataAtom) return (DataAtom) field;
+
+	switch (field.getType()) {
+	case INT: return new DataAtom(((DataInteger)field).get());
+	case LONG: return new DataAtom(((DataLong)field).get());
+	case FLOAT: return new DataAtom(((DataFloat)field).get());
+	case DOUBLE: return new DataAtom(((DataDouble)field).get());
+	case UNKNOWN: return new DataAtom(((DataUnknown)field).get());
+	case CHARARRAY: 
+		switch (((DataCharArray)field).getEncoding()) {
+		case UTF16: 
+			return new DataAtom(((DataCharArrayUtf16)field).get());
+		case NONE: 
+			return new DataAtom(((DataCharArrayNone)field).get());
+		default: throw new AssertionError("Unknown encoding");
+		}
+
+	// Can't use getFieldAsAtomic for tuple and bag because these need to
+	// recurse to getAtomField instead.
+	case TUPLE: {
+		Tuple t = (Tuple) field;
+		if (t.size() == 1) {
+			PigLogger.getLogger().warn("Asked for an atom field but found a tuple with one field.");
+			return t.getAtomField(0);
+		}
+		break;
+							   }
+
+	case BAG: {
+		DataBag b = (DataBag) field;
+		if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
+			Tuple t = (Tuple)b.content().next();
+			if (t.size() == 1) {
+				PigLogger.getLogger().warn("Asked for an atom field but found a bag with a tuple with one field.");
+				return t.getAtomField(0);
+			}
+		}
+		break;
+							 }
+
+	default: break;
+	}
+	throw new IOException("Incompatible type for request getAtomField().");
+}
+
+/**
+ * Get field i, if it is an Atomic type or can be coerced into an Atomic
+ * type.
+ * @param i field to get as an atomic type.
+ * @return contents of the field. If its an atomic type it will be
+ * returned as is.  If it is unknown, it will be converted to a char array
+ * none and returned as that.  If it's of tuple of one field, then
+ * that field will be returned.  If it's a bag of one element, then
+ * that element will be returned.
+ * @throws IOException if the field isn't an atom and it can't figure out
+ * how to do the coercion.
+ */
+public AtomicDatum getFieldAsAtomic(int i) throws IOException
+{
+	Datum field = getField(i); // throws exception if field doesn't exist
+
+	if (field.getDimension() == Datum.DataDimension.ATOMIC) {
+		return (AtomicDatum)field;
+	}
+
+	switch (field.getType()) {
+	case UNKNOWN:
+		return new DataCharArrayNone(((DataUnknown)field).get());
+
+	case TUPLE: {
+		Tuple t = (Tuple) field;
+		if (t.size() == 1) {
+			PigLogger.getLogger().warn("Warning: Asked for an atom field but found a tuple with one field.");
+			return t.getFieldAsAtomic(0);
+		}
+		break;
+				}
+
+	case BAG: {
+		DataBag b = (DataBag) field;
+		if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
+			Tuple t = (Tuple)b.content().next();
+			if (t.size() == 1) {
+				PigLogger.getLogger().warn("Warning: Asked for an atom field but found a bag with a tuple with one field.");
+				return t.getFieldAsAtomic(0);
+			}
+		}
+		break;
+			  }
+
+	default: break;
+	}
+
+	throw new IOException("Incompatible type for request getAtomField().");
+}
+
+/**
+ * Attempt to fetch a field as a tuple.
+ * @param i field number to get.
+ * @return If the field is a tuple, return it.  If it's bag of one tuple,
+ * return it.  Otherwise... 
+ * @throws IOException if the field is neither a tuple nor a bag of one
+ * tuple.
+ */
+public Tuple getTupleField(int i) throws IOException
+{
+	Datum field = getField(i); // throws exception if field doesn't exist
+
+	if (field.getType() == Datum.DataType.TUPLE) {
+		return (Tuple) field;
+	} else if (field.getType() == Datum.DataType.BAG) {
+		DataBag b = (DataBag) field;
+		if (b.bagOf() == Datum.DataType.TUPLE && b.size() == 1) {
+			return (Tuple)b.content().next();
+		}
+	}
+
+	throw new IOException("Incompatible type for request getTupleField().");
+}
+
+/**
+ * Attempt to fetch a field as a bag.
+ * @param i field number to get.
+ * @return If the field is a bag, return it. Otherwise... 
+ * @throws IOException if the field is not a bag.
+ */
+public DataBag getBagField(int i) throws IOException
+{
+	Datum field = getField(i); // throws exception if field doesn't exist
+
+	if (field.getType() == Datum.DataType.BAG) return (DataBag) field;
+
+	throw new IOException("Incompatible type for request getBagField().");
+}
+
+public final void appendTuple(Tuple other)
+{
+	for (Iterator<Datum> it = other.mFields.iterator(); it.hasNext();) {
+		mFields.add(it.next());
+	}
+}
+
+public final void appendField(Datum newField) { mFields.add(newField); }
+
+public String toDelimitedString(String delim) throws IOException
+{
+	StringBuffer buf = new StringBuffer();
+	for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
+		Datum field = it.next();
+		if (field.getDimension() == Datum.DataDimension.COMPLEX) {
+			throw new IOException("Unable to convert non-flat tuple to string.");
+		}
+
+		buf.append(field.toString());
+		if (it.hasNext()) buf.append(delim);
+	}
+	return buf.toString();
+}
+
+/*
+	public boolean lessThan(Tuple other) {
+		return (this.compareTo(other) < 0);
+	}
+
+	public boolean greaterThan(Tuple other) {
+		return (this.compareTo(other) > 0);
+	}
+	
+	*/
+
+/**
+ * See if two tuples are equal to each other.  Tuple equality is defined as being
+ * of the same size, and for each field f1...fn in t1 and fields g1...gn in t2,
+ * f1.equals(g1) ... fn.equals(gn) holds true.
+ * Don't make this use compareTo.  These functions are used in things like hashs
+ * and we want them to be as fast as possible.
+ */
+@Override
+public boolean equals(Object other)
+{
+	if (!(other instanceof Tuple)) return false;
+
+	Tuple t = (Tuple)other;
+
+	long sz = size();
+
+	if (t.size() != sz) return false;
+
+	for (long i = 0; i < sz; i++) {
+		if (!t.getField(i).equals(getField(i))) return false;
+	}
+
+	return true;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.TUPLE) return crossTypeCompare(od);
+
+	Tuple t = (Tuple)od;
+
+	long sz = size();
+	long tsz = t.size();
+	if (sz < tsz) return -1;
+	else if (sz > tsz) return 1;
+
+	for (long i = 0; i < sz; i++) {
+		int c = mFields.get((int)i).compareTo(t.mFields.get((int)i));
+		if (c != 0) return c;
+	}
+	return 0;
+}
+
+@Override
+public int hashCode()
+{
+	int hash = 1;
+	for (Iterator<Datum> it = mFields.iterator(); it.hasNext();) {
+		Datum f = it.next();
+		if (f == null) hash += 1;
+		else hash = 31 * hash + f.hashCode();
+	}
+	return hash;
+}
+
+// WritableComparable methods:
    
-    @Override
-	public void write(DataOutput out) throws IOException {
-        out.write(TUPLE);
-        int n = arity();
-        encodeInt(out, n);
-        for (int i = 0; i < n; i++) {
-        	Datum d = getField(i);
-        	if (d!=null){
-        		d.write(out);
-        	}else{
-        		throw new RuntimeException("Null field in tuple");
-        	}
-        }
-    }
-
-    //This method is invoked when the beginning 'TUPLE' is still on the stream
-    public void readFields(DataInput in) throws IOException {
-    	byte[] b = new byte[1];
-        in.readFully(b);
-        if (b[0]!=TUPLE)
-        	throw new IOException("Unexpected data while reading tuple from binary file");
-    	Tuple t = read(in);
-    	fields = t.fields;
-    }
-    
-    //This method is invoked when the beginning 'TUPLE' has been read off the stream
-    public static Tuple read(DataInput in) throws IOException {
-        // nuke the old contents of the tuple
-        Tuple ret = new Tuple();
-    	ret.fields = new ArrayList<Datum>();
-
-    	int size = decodeInt(in);
-        
-        for (int i = 0; i < size; i++) {
-            ret.appendField(readDatum(in));
-        }
-        
-        return ret;
-
-    }
-    
-    public static Datum readDatum(DataInput in) throws IOException{
-    	byte[] b = new byte[1];
-    	in.readFully(b);
-    	switch (b[0]) {
-	        case TUPLE:
-	            return Tuple.read(in);
-	        case BAG:
-	        	return DataBag.read(in);
-	        case MAP:
-	        	return DataMap.read(in);
-	        case ATOM:
-	            return DataAtom.read(in);
-	        default:
-	        	throw new IOException("Invalid data while reading Datum from binary file");
-    	}
-    }
-
-    //  Encode the integer so that the high bit is set on the last
-    // byte
-    static void encodeInt(DataOutput os, int i) throws IOException {
-        if (i >> 28 != 0)
-            os.write((i >> 28) & 0x7f);
-        if (i >> 21 != 0)
-            os.write((i >> 21) & 0x7f);
-        if (i >> 14 != 0)
-            os.write((i >> 14) & 0x7f);
-        if (i >> 7 != 0)
-            os.write((i >> 7) & 0x7f);
-        os.write((i & 0x7f) | (1 << 7));
-    }
-
-    static int decodeInt(DataInput is) throws IOException {
-        int i = 0;
-        int c;
-        while (true) {
-            c = is.readUnsignedByte();
-            if (c == -1)
-                break;
-            i <<= 7;
-            i += c & 0x7f;
-            if ((c & 0x80) != 0)
-                break;
-        }
-        return i;
-    }
+@Override
+public void write(DataOutput out) throws IOException
+{
+	out.write(Datum.DataType.TUPLE.getMarker());
+	long n = size();
+	out.writeLong(n);
+	for (long i = 0; i < n; i++) {
+		Datum d = getField((int)i);
+		if (d != null){
+			d.write(out);
+		} else {
+			throw new RuntimeException("Null field in tuple");
+		}
+	}
+}
+
+/**
+ * This method is invoked when the beginning 'TUPLE' is still on the stream.
+ * @param in DataInput to read from
+ * @throws IOExcetion if the expected data isn't a tuple.
+ */
+public void readFields(DataInput in) throws IOException
+{
+	byte[] b = new byte[1];
+	in.readFully(b);
+	if (b[0] != Datum.DataType.TUPLE.getMarker())
+		throw new IOException("Unexpected data while reading tuple from binary file");
+	Tuple t = read(in);
+	mFields = t.mFields;
+}
+	
+//This method is invoked when the beginning 'TUPLE' has been read off the stream
+public static Tuple read(DataInput in) throws IOException
+{
+	long size = in.readLong();
+
+	// nuke the old contents of the tuple
+	Tuple ret = new Tuple(size, false);
+
+	for (int i = 0; i < size; i++) {
+		ret.appendField(DatumImpl.readDatum(in));
+	}
+		
+	return ret;
+}
+	
+	/*
+public static Datum readDatum(DataInput in) throws IOException
+{
+	byte[] b = new byte[1];
+	in.readFully(b);
+	switch (b[0]) {
+	case TUPLE:
+		return Tuple.read(in);
+	case BAG:
+		return DataBag.read(in);
+	case MAP:
+		return DataMap.read(in);
+	case INT:
+		return DataInt.read(in);
+	case LONG:
+		return DataLong.read(in);
+	case FLOAT:
+		return DataFloat.read(in);
+	case DOUBLE:
+		return DataDouble.read(in);
+	case UNKNOWN:
+		return DataUnknown.read(in);
+	default:
+		throw new AssertionError("Invalid data type indicator " + b[0] + " while reading Datum from binary file");
+	}
+}
+
+	//  Encode the integer so that the high bit is set on the last
+	// byte
+	static void encodeInt(DataOutput os, int i) throws IOException {
+		if (i >> 28 != 0)
+			os.write((i >> 28) & 0x7f);
+		if (i >> 21 != 0)
+			os.write((i >> 21) & 0x7f);
+		if (i >> 14 != 0)
+			os.write((i >> 14) & 0x7f);
+		if (i >> 7 != 0)
+			os.write((i >> 7) & 0x7f);
+		os.write((i & 0x7f) | (1 << 7));
+	}
+
+	static int decodeInt(DataInput is) throws IOException {
+		int i = 0;
+		int c;
+		while (true) {
+			c = is.readUnsignedByte();
+			if (c == -1)
+				break;
+			i <<= 7;
+			i += c & 0x7f;
+			if ((c & 0x80) != 0)
+				break;
+		}
+		return i;
+	}
+	*/
+
+protected ArrayList<Datum> mFields;
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Thu Nov  1 13:48:16 2007
@@ -54,6 +54,7 @@
 import org.apache.pig.impl.mapreduceExec.MapReduceLauncher;
 import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PigLogger;
 import org.apache.pig.shock.SSHSocketImplFactory;
 
 
@@ -108,8 +109,11 @@
 	public PigContext(ExecType execType){
 		this.execType = execType;
 		
+		/*
 		mLogger = Logger.getLogger("org.apache.pig");
 		mLogger.setAdditivity(false);
+		*/
+		mLogger = PigLogger.getLogger();
 
     	initProperties();
     	
@@ -406,9 +410,11 @@
         return conf;
     }
 
+	/*
 	public Logger getLogger() {
 		return mLogger;
 	}
+	*/
 
     public void setJobtrackerLocation(String newLocation) {
         conf.set("mapred.job.tracker", newLocation);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Nov  1 13:48:16 2007
@@ -15,41 +15,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
-import java.util.Iterator;
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-
-
-public class FindQuantiles extends EvalFunc<DataBag>{
-	
-	/**
-	 * first field in the input tuple is the number of quantiles to generate
-	 * second field is the *sorted* bag of samples
-	 */
-	
-	@Override
-	public void exec(Tuple input, DataBag output) throws IOException {
-		int numQuantiles = input.getAtomField(0).numval().intValue();
-		DataBag samples = input.getBagField(1);
-		
-		int numSamples = samples.cardinality();
-		
-		int toSkip = numSamples / numQuantiles;
-		
-		int i=0, nextQuantile = 0;
-		Iterator<Tuple> iter = samples.content();
-		while (iter.hasNext()){
-			Tuple t = iter.next();
-			if (i==nextQuantile){
-				output.add(t);
-				nextQuantile+=toSkip+1;
-			}
-			i++;
-		}
-	}
-}
+import org.apache.pig.data.Datum;
+
+
+public class FindQuantiles extends EvalFunc<DataBag>{
+	
+	/**
+	 * first field in the input tuple is the number of quantiles to generate
+	 * second field is the *sorted* bag of samples
+	 */
+	
+	@Override
+	public void exec(Tuple input, DataBag output) throws IOException {
+		int numQuantiles = input.getAtomField(0).numval().intValue();
+		DataBag samples = input.getBagField(1);
+		
+		int numSamples = samples.cardinality();
+		
+		int toSkip = numSamples / numQuantiles;
+		
+		int i=0, nextQuantile = 0;
+		Iterator<Datum> iter = samples.content();
+		while (iter.hasNext()){
+			Tuple t = (Tuple)iter.next();
+			if (i==nextQuantile){
+				output.add(t);
+				nextQuantile+=toSkip+1;
+			}
+			i++;
+		}
+	}
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java Thu Nov  1 13:48:16 2007
@@ -15,68 +15,68 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.builtin;
-
-import java.io.IOException;
-import java.util.Random;
+package org.apache.pig.impl.builtin;
+
+import java.io.IOException;
+import java.util.Random;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-
-
-public class GFCross extends EvalFunc<DataBag> {
-	int numInputs, myNumber, numGroupsPerInput;
-	
-	public static int DEFAULT_PARALLELISM = 96;
+
+
+public class GFCross extends EvalFunc<DataBag> {
+	int numInputs, myNumber, numGroupsPerInput;
+	
+	public static int DEFAULT_PARALLELISM = 96;
 
 	@Override
-	public void exec(Tuple input, DataBag output) throws IOException {;
-			numInputs = Integer.parseInt(input.getAtomField(0).strval());
-			myNumber = Integer.parseInt(input.getAtomField(1).strval());
-		
-		
-			numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
-			int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
-			
-			int[] digits = new int[numInputs];
-			for (int i=0; i<numInputs; i++){
-				if (i == myNumber){
-					Random r = new Random(System.currentTimeMillis());
-					digits[i] = r.nextInt(numGroupsPerInput);
-				}else{
-					digits[i] = 0;
-				}
-			}
-			
-			for (int i=0; i<numGroupsGoingTo; i++){
-				output.add(toTuple(digits));
-				next(digits);
-			}			
-
-	}
-	
-	private Tuple toTuple(int[] digits) throws IOException{
-		Tuple t = new Tuple(numInputs);
-		for (int i=0; i<numInputs; i++){
-			t.setField(i, digits[i]);
-		}
-		return t;
-	}
-	
-	private void next(int[] digits){
-		for (int i=0; i<numInputs; i++){
-			if (i== myNumber)
-				continue;
-			else{
-				if (digits[i] == numGroupsPerInput - 1){
-					digits[i] = 0;
-				}else{
-					digits[i]++;
-					break;
-				}
-			}
-		}
-	}
-
-}
+	public void exec(Tuple input, DataBag output) throws IOException {;
+			numInputs = Integer.parseInt(input.getAtomField(0).strval());
+			myNumber = Integer.parseInt(input.getAtomField(1).strval());
+		
+		
+			numGroupsPerInput = (int) Math.ceil(Math.pow(DEFAULT_PARALLELISM, 1.0/numInputs));
+			int numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
+			
+			int[] digits = new int[numInputs];
+			for (int i=0; i<numInputs; i++){
+				if (i == myNumber){
+					Random r = new Random(System.currentTimeMillis());
+					digits[i] = r.nextInt(numGroupsPerInput);
+				}else{
+					digits[i] = 0;
+				}
+			}
+			
+			for (int i=0; i<numGroupsGoingTo; i++){
+				output.add(toTuple(digits));
+				next(digits);
+			}			
+
+	}
+	
+	private Tuple toTuple(int[] digits) throws IOException{
+		Tuple t = new Tuple(numInputs);
+		for (int i=0; i<numInputs; i++){
+			t.setField(i, digits[i]);
+		}
+		return t;
+	}
+	
+	private void next(int[] digits){
+		for (int i=0; i<numInputs; i++){
+			if (i== myNumber)
+				continue;
+			else{
+				if (digits[i] == numGroupsPerInput - 1){
+					digits[i] = 0;
+				}else{
+					digits[i]++;
+					break;
+				}
+			}
+		}
+	}
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java Thu Nov  1 13:48:16 2007
@@ -48,6 +48,7 @@
 	}
 
 	private class EndOfQueue extends DataBag{
+		EndOfQueue() { super(Datum.DataType.TUPLE); }
 		public void add(Datum d){}
 	}
 	

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java Thu Nov  1 13:48:16 2007
@@ -149,10 +149,10 @@
      * Compare 2 tuples according to this spec. This is used while sorting by arbitrary (even generated) fields.
      * @return
      */
-    public Comparator<Tuple> getComparator() {
-        return new Comparator<Tuple>() {
+    public Comparator<Datum> getComparator() {
+        return new Comparator<Datum>() {
         	
-        	public int compare(Tuple t1, Tuple t2) {
+        	public int compare(Datum t1, Datum t2) {
     			return (simpleEval(t1).compareTo(simpleEval(t2)));
             }
         };

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java Thu Nov  1 13:48:16 2007
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval;
-
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.DataAtom;
@@ -32,46 +32,46 @@
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-
-public class FuncEvalSpec extends EvalSpec {
-	private static final long serialVersionUID = 1L;
-	
-	String funcName;
-	EvalSpec args;
-	transient EvalFunc func;
-
-	public FuncEvalSpec(FunctionInstantiator fInstantiaor, String funcName, EvalSpec args) throws IOException{		
-		this.funcName = funcName;
-		this.args = args;
-		
-		if (args!=null && args.isAsynchronous())
-			throw new IOException("Can't have the output of an asynchronous function as the argument to an eval function");
-		instantiateFunc(fInstantiaor);
-	}
-	
-	@Override
-	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
-		if(instantiaor != null)
+
+
+
+public class FuncEvalSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	
+	String funcName;
+	EvalSpec args;
+	transient EvalFunc func;
+
+	public FuncEvalSpec(FunctionInstantiator fInstantiaor, String funcName, EvalSpec args) throws IOException{		
+		this.funcName = funcName;
+		this.args = args;
+		
+		if (args!=null && args.isAsynchronous())
+			throw new IOException("Can't have the output of an asynchronous function as the argument to an eval function");
+		instantiateFunc(fInstantiaor);
+	}
+	
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
+		if(instantiaor != null)
 			func = (EvalFunc) instantiaor.instantiateFuncFromAlias(funcName);
-		args.instantiateFunc(instantiaor);
-	}
-	
-	@Override
-	public boolean amenableToCombiner() {
-		// TODO Turn on algebraic
-		return false;
-	}
-
-	@Override
-	public List<String> getFuncs() {
-		List<String> funcs = new ArrayList<String>();
-		funcs.add(funcName);
-		return funcs;
-	}
-
-	@Override
+		args.instantiateFunc(instantiaor);
+	}
+	
+	@Override
+	public boolean amenableToCombiner() {
+		// TODO Turn on algebraic
+		return false;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		List<String> funcs = new ArrayList<String>();
+		funcs.add(funcName);
+		return funcs;
+	}
+
+	@Override
 	protected Schema mapInputSchema(Schema schema) {
 		Schema inputToFunction;
 		if (args!=null){
@@ -79,162 +79,163 @@
 		}else{
 			inputToFunction = new TupleSchema();
 		}
-		
-		return func.outputSchema(inputToFunction);
-	}
-
-	@Override
-	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-		return new DataCollector(endOfPipe){
-			private Datum getPlaceHolderForFuncOutput(){
-				Type returnType = func.getReturnType();
-				if (returnType == DataAtom.class)
-					return new DataAtom();
-				else if (returnType == Tuple.class)
-					return new Tuple();
-				else if (returnType == DataBag.class)
-					return new FakeDataBag(successor);
-				else if (returnType == DataMap.class)
-					return new DataMap();
-				else throw new RuntimeException("Internal error: Unknown return type of eval function");
-			}
-			
-			@Override
-			public void add(Datum d) {
-				if (checkDelimiter(d))
-					addToSuccessor(d);
-				
-				Datum argsValue = null;
-				if (args!=null)
-					argsValue = args.simpleEval(d);
-				
-				if (argsValue!=null && !(argsValue instanceof Tuple))
-	        		throw new RuntimeException("Internal error: Non-tuple returned on evaluation of arguments.");
-	            
-				Datum placeHolderForFuncOutput = getPlaceHolderForFuncOutput();
-				try{
-					func.exec((Tuple)argsValue, placeHolderForFuncOutput);
-				}catch (IOException e){
+		
+		return func.outputSchema(inputToFunction);
+	}
+
+	@Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+		return new DataCollector(endOfPipe){
+			private Datum getPlaceHolderForFuncOutput(){
+				Type returnType = func.getReturnType();
+				if (returnType == DataAtom.class)
+					return new DataAtom();
+				else if (returnType == Tuple.class)
+					return new Tuple();
+				else if (returnType == DataBag.class)
+					return new FakeDataBag(successor);
+				else if (returnType == DataMap.class)
+					return new DataMap();
+				else throw new RuntimeException("Internal error: Unknown return type of eval function");
+			}
+			
+			@Override
+			public void add(Datum d) {
+				if (checkDelimiter(d))
+					addToSuccessor(d);
+				
+				Datum argsValue = null;
+				if (args!=null)
+					argsValue = args.simpleEval(d);
+				
+				if (argsValue!=null && !(argsValue instanceof Tuple))
+	        		throw new RuntimeException("Internal error: Non-tuple returned on evaluation of arguments.");
+	            
+				Datum placeHolderForFuncOutput = getPlaceHolderForFuncOutput();
+				try{
+					func.exec((Tuple)argsValue, placeHolderForFuncOutput);
+				}catch (IOException e){
 					RuntimeException re = new RuntimeException(e);
 					re.setStackTrace(e.getStackTrace());
-					throw re;
-				}
-				
-				if (placeHolderForFuncOutput instanceof FakeDataBag){
-					FakeDataBag fBag = (FakeDataBag)placeHolderForFuncOutput;
-					synchronized(fBag){
-						if (!fBag.isStale())
-							fBag.addDelimiters();
-					}
-				}else{
-					addToSuccessor(placeHolderForFuncOutput);
-				}
-			}
-			
-			@Override
-			protected void finish() {
-				if (args!=null) 
-					args.finish();
-				func.finish();
-			}			
-		};
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("[");
-		sb.append(funcName);
-		sb.append("(");
-		sb.append(args);
-		sb.append(")");
-		sb.append("]");
-		return sb.toString();
-	}
-	
-	
-
-	private class FakeDataBag extends DataBag{
+					throw re;
+				}
+				
+				if (placeHolderForFuncOutput instanceof FakeDataBag){
+					FakeDataBag fBag = (FakeDataBag)placeHolderForFuncOutput;
+					synchronized(fBag){
+						if (!fBag.isStale())
+							fBag.addDelimiters();
+					}
+				}else{
+					addToSuccessor(placeHolderForFuncOutput);
+				}
+			}
+			
+			@Override
+			protected void finish() {
+				if (args!=null) 
+					args.finish();
+				func.finish();
+			}			
+		};
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append(funcName);
+		sb.append("(");
+		sb.append(args);
+		sb.append(")");
+		sb.append("]");
+		return sb.toString();
+	}
+	
+	
+
+	private class FakeDataBag extends DataBag{
 		int staleCount = 0;
-		DataCollector successor;
-		boolean startAdded = false, endAdded = false;
-		
-		public FakeDataBag(DataCollector successor){
-			this.successor = successor;
-		}
-		
-		void addStart(){
-			successor.add(DataBag.startBag);
-			startAdded = true;	
-		}
-		
-		void addEnd(){
-			successor.add(DataBag.endBag);
-			endAdded = true;
-		}
-		
-		void addDelimiters(){
-			if (!startAdded)
-				addStart();
-			if (!endAdded)
-				addEnd();	
-		}
-		
-		@Override
-		public void add(Tuple t) {
-			synchronized(this){
-				if (!startAdded)
-					addStart();
-			}
-			successor.add(t);
-		}
-		
-		@Override
-		public void markStale(boolean stale) {
-			synchronized (this){
-				if (stale)
-					staleCount++;
-				else{
-					if (staleCount>0){
-						addDelimiters();
-						staleCount--;
-					}
-				}
-				super.markStale(stale);
-			}
-		}
-		
-		public boolean isStale(){
-			synchronized(this){
-				return staleCount > 0;
-			}
-		}
-	}
-	
-	
-	/**
-     * Extend the default deserialization
-     * @param in
-     * @throws IOException
-     * @throws ClassNotFoundException
-     */
-	/*
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException{
-    	in.defaultReadObject();
-    	instantiateFunc();
-    }
-	*/
-	public EvalFunc getFunc() {
-		return func;
-	}
-	
-	public Type getReturnType(){
-		return func.getReturnType();
-	}
-	
-	@Override
-	public boolean isAsynchronous() {
-		return func.isAsynchronous();
-	}
-	
-}
+		DataCollector successor;
+		boolean startAdded = false, endAdded = false;
+		
+		public FakeDataBag(DataCollector successor){
+			super(Datum.DataType.TUPLE);
+			this.successor = successor;
+		}
+		
+		void addStart(){
+			successor.add(DataBag.startBag);
+			startAdded = true;	
+		}
+		
+		void addEnd(){
+			successor.add(DataBag.endBag);
+			endAdded = true;
+		}
+		
+		void addDelimiters(){
+			if (!startAdded)
+				addStart();
+			if (!endAdded)
+				addEnd();	
+		}
+		
+		@Override
+		public void add(Datum d) {
+			synchronized(this){
+				if (!startAdded)
+					addStart();
+			}
+			successor.add(d);
+		}
+		
+		@Override
+		public void markStale(boolean stale) {
+			synchronized (this){
+				if (stale)
+					staleCount++;
+				else{
+					if (staleCount>0){
+						addDelimiters();
+						staleCount--;
+					}
+				}
+				super.markStale(stale);
+			}
+		}
+		
+		public boolean isStale(){
+			synchronized(this){
+				return staleCount > 0;
+			}
+		}
+	}
+	
+	
+	/**
+     * Extend the default deserialization
+     * @param in
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+	/*
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException{
+    	in.defaultReadObject();
+    	instantiateFunc();
+    }
+	*/
+	public EvalFunc getFunc() {
+		return func;
+	}
+	
+	public Type getReturnType(){
+		return func.getReturnType();
+	}
+	
+	@Override
+	public boolean isAsynchronous() {
+		return func.isAsynchronous();
+	}
+	
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java Thu Nov  1 13:48:16 2007
@@ -111,7 +111,7 @@
     	public DatumBag(){
     		super(null);
     		try{
-    			bag = BagFactory.getInstance().getNewBag();
+    			bag = BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
     		}catch(IOException e){
     			throw new RuntimeException(e);
     		}
@@ -124,7 +124,7 @@
     	
     	public Iterator<Datum> content(){
     		return new Iterator<Datum>(){
-    			Iterator<Tuple> iter;
+    			Iterator<Datum> iter;
     			{
     				iter = bag.content();
     			}
@@ -132,11 +132,7 @@
     				return iter.hasNext();
     			}
     			public Datum next() {
-    				try{
-    					return iter.next().getField(0);
-    				}catch(IOException e){
-    					throw new RuntimeException(e);
-    				}
+    				return ((Tuple)iter.next()).getField(0);
     			}
     			public void remove() {
     				throw new RuntimeException("Can't remove from read-only iterator");
@@ -227,14 +223,10 @@
                
                for (int i=0; i< numItems; i++){
             	   if (specs.get(i).isFlattened() && outData[i] instanceof Tuple){
-        			   Tuple t = (Tuple)outData[i];
-        			   try{
-		    			   for (int j=0; j < t.arity(); j++){
-		    				   outTuple.appendField(t.getField(j));
-		    			   }
-        			   }catch (IOException e){
-        				   throw new RuntimeException(e);
-        			   }
+        				Tuple t = (Tuple)outData[i];
+						for (int j=0; j < t.arity(); j++){
+		    			   outTuple.appendField(t.getField(j));
+		    			}
         		   }else{
             		   outTuple.appendField(outData[i]);
             	   }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java Thu Nov  1 13:48:16 2007
@@ -15,61 +15,62 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval;
-
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.eval;
+
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.pig.data.DataMap;
 import org.apache.pig.data.Datum;
+import org.apache.pig.data.AtomicDatum;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-public class MapLookupSpec extends SimpleEvalSpec {
-	
-	/**
-	 * 
-	 */
-	private static final long serialVersionUID = 1L;
-	protected String keyToLookup;
-	
-	public MapLookupSpec(String keyToLookup){
-		this.keyToLookup = keyToLookup;
-	}
-
-	@Override
-	protected Datum eval(Datum d) {
-		if (!(d instanceof DataMap))
-			throw new RuntimeException("Attempt to lookup on data of type " + d.getClass().getName());
-		return ((DataMap)d).get(keyToLookup);
-	}
-	
-	@Override
-	public boolean amenableToCombiner() {
-		return true;
-	}
-	
-	@Override
-	public List<String> getFuncs() {
-		return new ArrayList<String>();
-	}
-	
-	@Override
-	protected Schema mapInputSchema(Schema schema) {
-		//TODO: until we have map schemas
-		return new TupleSchema();
-	}
-	
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("[");
-		sb.append("#'");
-		sb.append(keyToLookup);
-		sb.append("'");
-		sb.append("]");
-		return sb.toString();
-	}
-	
-}
+
+
+public class MapLookupSpec extends SimpleEvalSpec {
+	
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 1L;
+	protected AtomicDatum keyToLookup;
+	
+	public MapLookupSpec(AtomicDatum keyToLookup){
+		this.keyToLookup = keyToLookup;
+	}
+
+	@Override
+	protected Datum eval(Datum d) {
+		if (!(d instanceof DataMap))
+			throw new RuntimeException("Attempt to lookup on data of type " + d.getClass().getName());
+		return ((DataMap)d).get(keyToLookup);
+	}
+	
+	@Override
+	public boolean amenableToCombiner() {
+		return true;
+	}
+	
+	@Override
+	public List<String> getFuncs() {
+		return new ArrayList<String>();
+	}
+	
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		//TODO: until we have map schemas
+		return new TupleSchema();
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append("#'");
+		sb.append(keyToLookup.toString());
+		sb.append("'");
+		sb.append("]");
+		return sb.toString();
+	}
+	
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java Thu Nov  1 13:48:16 2007
@@ -15,128 +15,123 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
-
-public class ProjectSpec extends SimpleEvalSpec {
-	private static final long serialVersionUID = 1L;
-
-	protected List<Integer> cols;
-	protected boolean wrapInTuple;
-	
-
-	public List<Integer> getCols() {
-		return cols;
-	}
-
-	public ProjectSpec(List<Integer> cols){		
-		this.cols = cols;
-	}
-	
-	public ProjectSpec(int col){		
-		cols = new ArrayList<Integer>();
-		cols.add(col);
-	}
-		
-	@Override
-	public boolean amenableToCombiner() {
-		return true;
-	}
-
-	@Override
-	public List<String> getFuncs() {
-		return new ArrayList<String>();
-	}
-
-	@Override
-	protected Schema mapInputSchema(Schema schema) {
-		if (!wrapInTuple && cols.size()==1){
-			return maskNullSchema(schema.schemaFor(cols.get(0)));
-		}else{
-			TupleSchema output = new TupleSchema();
-			for (int i: cols){
-				output.add(maskNullSchema(schema.schemaFor(i)));
-			}
-			return output;
-		}
-	}
-	
-	private Schema maskNullSchema(Schema s){
-		if (s == null)
-			return new TupleSchema();
-		else
-			return s;
-		
-	}
-	
-	@Override
-	protected Datum eval(Datum d){
-		if (!(d instanceof Tuple)){
-			throw new RuntimeException("Project operator expected a Tuple, found a " + d.getClass().getSimpleName());
-		}
-		Tuple t = (Tuple)d;
-		
-		try{
-			if (!wrapInTuple && cols.size() == 1){
-				return t.getField(cols.get(0));
-			}else{
-				Tuple out = new Tuple();
-				for (int i: cols){
-					out.appendField(t.getField(i));
-				}
-				return out;
-			}
-		}catch (IOException e){
-			//TODO: Based on a strictness level, insert null values here
-				throw new RuntimeException(e);		
-		}
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("[");
-		sb.append("PROJECT ");
-		boolean first = true;
-		for (int i: cols){
-			if (!first)
-				sb.append(",");
-			else
-				first = false;
-			sb.append("$");
-			sb.append(i);
-		}
-		sb.append("]");
-		return sb.toString();
-	}
-    
-    public int numCols() {
-        return cols.size();
-    }
-    
-    public int getCol(int i) {
-        if (i < 0 || i >= cols.size()) 
-            throw new RuntimeException("Internal error: improper use of getColumn in " + ProjectSpec.class.getName());
-        else return cols.get(i);
-    }
-	
-	public int getCol(){
-		if (cols.size()!=1)
-			throw new RuntimeException("Internal error: improper use of getColumn in " + ProjectSpec.class.getName());
-		return cols.get(0);
-	}
-
-	public void setWrapInTuple(boolean wrapInTuple) {
-		this.wrapInTuple = wrapInTuple;
-	}
-
-}
+
+
+public class ProjectSpec extends SimpleEvalSpec {
+	private static final long serialVersionUID = 1L;
+
+	protected List<Integer> cols;
+	protected boolean wrapInTuple;
+	
+
+	public List<Integer> getCols() {
+		return cols;
+	}
+
+	public ProjectSpec(List<Integer> cols){		
+		this.cols = cols;
+	}
+	
+	public ProjectSpec(int col){		
+		cols = new ArrayList<Integer>();
+		cols.add(col);
+	}
+		
+	@Override
+	public boolean amenableToCombiner() {
+		return true;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		return new ArrayList<String>();
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		if (!wrapInTuple && cols.size()==1){
+			return maskNullSchema(schema.schemaFor(cols.get(0)));
+		}else{
+			TupleSchema output = new TupleSchema();
+			for (int i: cols){
+				output.add(maskNullSchema(schema.schemaFor(i)));
+			}
+			return output;
+		}
+	}
+	
+	private Schema maskNullSchema(Schema s){
+		if (s == null)
+			return new TupleSchema();
+		else
+			return s;
+		
+	}
+	
+	@Override
+	protected Datum eval(Datum d){
+		if (!(d instanceof Tuple)){
+			throw new RuntimeException("Project operator expected a Tuple, found a " + d.getClass().getSimpleName());
+		}
+		Tuple t = (Tuple)d;
+		
+		if (!wrapInTuple && cols.size() == 1){
+			return t.getField(cols.get(0));
+		}else{
+			Tuple out = new Tuple();
+			for (int i: cols){
+				out.appendField(t.getField(i));
+			}
+			return out;
+		}
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append("PROJECT ");
+		boolean first = true;
+		for (int i: cols){
+			if (!first)
+				sb.append(",");
+			else
+				first = false;
+			sb.append("$");
+			sb.append(i);
+		}
+		sb.append("]");
+		return sb.toString();
+	}
+    
+    public int numCols() {
+        return cols.size();
+    }
+    
+    public int getCol(int i) {
+        if (i < 0 || i >= cols.size()) 
+            throw new RuntimeException("Internal error: improper use of getColumn in " + ProjectSpec.class.getName());
+        else return cols.get(i);
+    }
+	
+	public int getCol(){
+		if (cols.size()!=1)
+			throw new RuntimeException("Internal error: improper use of getColumn in " + ProjectSpec.class.getName());
+		return cols.get(0);
+	}
+
+	public void setWrapInTuple(boolean wrapInTuple) {
+		this.wrapInTuple = wrapInTuple;
+	}
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java Thu Nov  1 13:48:16 2007
@@ -15,11 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -28,110 +28,110 @@
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.eval.collector.DataCollector;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-
-public class SortDistinctSpec extends EvalSpec {
-	private static final long serialVersionUID = 1L;
-	transient DataBag bag;
-	protected EvalSpec sortSpec;
-	protected boolean eliminateDuplicates;
-	
-	
-	public SortDistinctSpec(boolean eliminateDuplicates, EvalSpec sortSpec){
-		this.eliminateDuplicates = eliminateDuplicates;
-		this.sortSpec = sortSpec;
-	}
-		
-	@Override
-	public boolean amenableToCombiner() {
-		//Combiner may potentially be useful if we are eliminating duplicates
-		return eliminateDuplicates;
-	}
-
-	@Override
-	public List<String> getFuncs() {
-		if (sortSpec!=null)
-			return sortSpec.getFuncs();
-		else
-			return new ArrayList<String>();
-	}
-
-	@Override
-	protected Schema mapInputSchema(Schema schema) {
-		return schema;
-	}
-
-	@Override
-	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
-		return new DataCollector(endOfPipe){
-			
-			@Override
-			public void add(Datum d) {
-				if (inTheMiddleOfBag){
-					if (checkDelimiter(d)){
-						addToSuccessor(bag);
-					}else{
-						if (d instanceof Tuple){
-							bag.add((Tuple)d);
-						}else{
-							bag.add(new Tuple(d));
-						}
-					}
-				}else{
-					if (checkDelimiter(d)){
-						//Bag must have started now
-						try{
-							bag = BagFactory.getInstance().getNewBag();
-							if (eliminateDuplicates)
-								bag.distinct();
-							else
-								bag.sort(sortSpec);
-							
-						}catch(IOException e){
-							throw new RuntimeException(e);
-						}
-					}else{
-						addToSuccessor(d);
-					}
-				}
-			}
-			
-			@Override
-			protected boolean needFlatteningLocally() {
-				return true;
-			}
-			
-			
-			@Override
-			protected void finish() {
-			
-				/*
-				 * To clear the temporary files if it was a big bag
-				 */
-				if (bag!=null)
-					bag.clear();
-			
-			}
-		};
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("[");
-		sb.append(eliminateDuplicates?"DISTINCT ":"SORT ");
-		if (sortSpec!=null)
-			sb.append(sortSpec.toString());
-		sb.append("]");
-		return sb.toString();
-	}
-
-	@Override
-	public void instantiateFunc(FunctionInstantiator instantiaor)
+
+
+public class SortDistinctSpec extends EvalSpec {
+	private static final long serialVersionUID = 1L;
+	transient DataBag bag;
+	protected EvalSpec sortSpec;
+	protected boolean eliminateDuplicates;
+	
+	
+	public SortDistinctSpec(boolean eliminateDuplicates, EvalSpec sortSpec){
+		this.eliminateDuplicates = eliminateDuplicates;
+		this.sortSpec = sortSpec;
+	}
+		
+	@Override
+	public boolean amenableToCombiner() {
+		//Combiner may potentially be useful if we are eliminating duplicates
+		return eliminateDuplicates;
+	}
+
+	@Override
+	public List<String> getFuncs() {
+		if (sortSpec!=null)
+			return sortSpec.getFuncs();
+		else
+			return new ArrayList<String>();
+	}
+
+	@Override
+	protected Schema mapInputSchema(Schema schema) {
+		return schema;
+	}
+
+	@Override
+	protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+		return new DataCollector(endOfPipe){
+			
+			@Override
+			public void add(Datum d) {
+				if (inTheMiddleOfBag){
+					if (checkDelimiter(d)){
+						addToSuccessor(bag);
+					}else{
+						if (d instanceof Tuple){
+							bag.add((Tuple)d);
+						}else{
+							bag.add(new Tuple(d));
+						}
+					}
+				}else{
+					if (checkDelimiter(d)){
+						//Bag must have started now
+						try{
+							bag = BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+							if (eliminateDuplicates)
+								bag.distinct();
+							else
+								bag.sort(sortSpec);
+							
+						}catch(IOException e){
+							throw new RuntimeException(e);
+						}
+					}else{
+						addToSuccessor(d);
+					}
+				}
+			}
+			
+			@Override
+			protected boolean needFlatteningLocally() {
+				return true;
+			}
+			
+			
+			@Override
+			protected void finish() {
+			
+				/*
+				 * To clear the temporary files if it was a big bag
+				 */
+				if (bag!=null)
+					bag.clear();
+			
+			}
+		};
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append("[");
+		sb.append(eliminateDuplicates?"DISTINCT ":"SORT ");
+		if (sortSpec!=null)
+			sb.append(sortSpec.toString());
+		sb.append("]");
+		return sb.toString();
+	}
+
+	@Override
+	public void instantiateFunc(FunctionInstantiator instantiaor)
 			throws IOException {
-		if (sortSpec!=null)
-			sortSpec.instantiateFunc(instantiaor);		
-	}
-
-	
-}
+		if (sortSpec!=null)
+			sortSpec.instantiateFunc(instantiaor);		
+	}
+
+	
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java Thu Nov  1 13:48:16 2007
@@ -84,7 +84,7 @@
 			DataBag bag = (DataBag)d;
 			//flatten the bag and send it through the pipeline
 			successor.add(DataBag.startBag);
-		    Iterator<Tuple> iter = bag.content();
+		    Iterator<Datum> iter = bag.content();
 	    	while(iter.hasNext())
 	    		successor.add(iter.next());
 	    	successor.add(DataBag.endBag);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java Thu Nov  1 13:48:16 2007
@@ -15,49 +15,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.eval.collector;
-
-import java.io.IOException;
+package org.apache.pig.impl.eval.collector;
+
+import java.io.IOException;
 
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
-
-
-public class UnflattenCollector extends DataCollector {
-	DataBag bag;
-	
-	public UnflattenCollector(DataCollector successor){
-		super(successor);
-	}
-	
-	@Override
-	public void add(Datum d) {
-		if (inTheMiddleOfBag){
-			if (checkDelimiter(d)){
-				successor.add(bag);
-			}else{
-				if (d instanceof Tuple){
-					bag.add((Tuple)d);
-				}else{
-					bag.add(new Tuple(d));
-				}
-			}
-		}else{
-			if (checkDelimiter(d)){
-				//Bag must have started now
-				try{
-					bag = BagFactory.getInstance().getNewBag();
-				}catch(IOException e){
-					throw new RuntimeException(e);
-				}
-			}else{
-				successor.add(d);
-			}
-		}
-	}
-	
-	
-
-}
+
+
+public class UnflattenCollector extends DataCollector {
+	DataBag bag;
+	
+	public UnflattenCollector(DataCollector successor){
+		super(successor);
+	}
+	
+	@Override
+	public void add(Datum d) {
+		if (inTheMiddleOfBag){
+			if (checkDelimiter(d)){
+				successor.add(bag);
+			}else{
+				if (d instanceof Tuple){
+					bag.add((Tuple)d);
+				}else{
+					bag.add(new Tuple(d));
+				}
+			}
+		}else{
+			if (checkDelimiter(d)){
+				//Bag must have started now
+				try{
+					bag = BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
+				}catch(IOException e){
+					throw new RuntimeException(e);
+				}
+			}else{
+				successor.add(d);
+			}
+		}
+	}
+	
+	
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java Thu Nov  1 13:48:16 2007
@@ -15,71 +15,75 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.io;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Iterator;
+package org.apache.pig.impl.io;
 
-import org.apache.pig.data.Tuple;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Iterator;
 
-
-public class DataBagFileReader {
-	File store;
-	
-	public DataBagFileReader(File f) throws IOException{
-		store = f;
-	}
-	
-	private class myIterator implements Iterator<Tuple>{
-		DataInputStream in;
-		Tuple nextTuple;
-		
-		public myIterator() throws IOException{
-			in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
-			getNextTuple();
-		}
-		
-		private void getNextTuple() throws IOException{
-			try{
-				nextTuple = new Tuple();
-		        nextTuple.readFields(in);
-			} catch (EOFException e) {
-				in.close();
-				nextTuple = null;
-			}
-		}
-		
-		public boolean hasNext(){
-			return nextTuple != null;
-		}
-		
-		public Tuple next(){
-			Tuple returnValue = nextTuple;
-			if (returnValue!=null){
-				try{
-					getNextTuple();
-				}catch (IOException e){
-					throw new RuntimeException(e.getMessage());
-				}
-			}
-			return returnValue;
-		}
-		
-		public void remove(){
-			throw new RuntimeException("Read only cursor");
-		}
-	}
-
-	public Iterator<Tuple> content() throws IOException{
-		return new myIterator();		
-	}
-	
-	public void clear() throws IOException{
-		store.delete();
-	}
-}
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.DatumImpl;
+
+
+public class DataBagFileReader {
+	File store;
+	
+	public DataBagFileReader(File f) throws IOException{
+		store = f;
+	}
+	
+	private class myIterator implements Iterator<Datum>{
+		DataInputStream in;
+		Datum nextDatum;
+		
+		public myIterator() throws IOException{
+			in = new DataInputStream(new BufferedInputStream(new FileInputStream(store)));
+			getNextDatum();
+		}
+		
+		private void getNextDatum() throws IOException{
+			try{
+				/*
+				nextDatum = new Datum();
+		        nextDatum.readFields(in);
+				*/
+				nextDatum = DatumImpl.readDatum(in);
+			} catch (EOFException e) {
+				in.close();
+				nextDatum = null;
+			}
+		}
+		
+		public boolean hasNext(){
+			return nextDatum != null;
+		}
+		
+		public Datum next(){
+			Datum returnValue = nextDatum;
+			if (returnValue!=null){
+				try{
+					getNextDatum();
+				}catch (IOException e){
+					throw new RuntimeException(e.getMessage());
+				}
+			}
+			return returnValue;
+		}
+		
+		public void remove(){
+			throw new RuntimeException("Read only cursor");
+		}
+	}
+
+	public Iterator<Datum> content() throws IOException{
+		return new myIterator();		
+	}
+	
+	public void clear() throws IOException{
+		store.delete();
+	}
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java Thu Nov  1 13:48:16 2007
@@ -15,44 +15,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.impl.io;
-
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.pig.data.Tuple;
-
-
-
-public class DataBagFileWriter {
-	File store;
-	DataOutputStream out;
-
-	public DataBagFileWriter(File store) throws IOException{
-		this.store = store;
-		out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store)));
-	}
-	
-	public void write(Tuple t) throws IOException{
-		t.write(out);
-	}
-	
-	public void write(Iterator<Tuple> iter) throws IOException{
-		while (iter.hasNext())
-			iter.next().write(out);
-	}
-	
-	public void close() throws IOException{
-		flush();
-		out.close();
-	}
-	
-	public void flush() throws IOException{
-		out.flush();
-	}
-	
-}
+package org.apache.pig.impl.io;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.data.Datum;
+
+
+
+public class DataBagFileWriter {
+	File store;
+	DataOutputStream out;
+
+	public DataBagFileWriter(File store) throws IOException{
+		this.store = store;
+		out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store)));
+	}
+	
+	public void write(Datum d) throws IOException{
+		d.write(out);
+	}
+	
+	public void write(Iterator<Datum> iter) throws IOException{
+		while (iter.hasNext())
+			iter.next().write(out);
+	}
+	
+	public void close() throws IOException{
+		flush();
+		out.close();
+	}
+	
+	public void flush() throws IOException{
+		out.flush();
+	}
+	
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java Thu Nov  1 13:48:16 2007
@@ -27,6 +27,7 @@
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 
 
@@ -44,7 +45,8 @@
     }
     
     public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException {
-        DataBag content = BagFactory.getInstance().getNewBag();
+        DataBag content =
+			BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
         InputStream is = FileLocalizer.open(file, pigContext);
         lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
         Tuple f = null;
@@ -58,8 +60,8 @@
     public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException {
         BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
         sfunc.bindTo(bos);
-        for (Iterator<Tuple> it = data.content(); it.hasNext();) {
-            Tuple row = it.next();
+        for (Iterator<Datum> it = data.content(); it.hasNext();) {
+            Tuple row = (Tuple)it.next();
             sfunc.putNext(row);
         }
         sfunc.finish();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Nov  1 13:48:16 2007
@@ -67,21 +67,17 @@
 		}
 		
 		Datum[] groupAndTuple = new Datum[2];
-		try{
-    		if (output.arity() == 2){
-    			groupAndTuple[0] = output.getField(0);
-    			groupAndTuple[1] = output.getField(1);
-    		}else{
-    			Tuple group = new Tuple();
-    			for (int j=0; j<output.arity()-1; j++){
-    				group.appendField(output.getField(j));
-    			}
-    			groupAndTuple[0] = group;
-    			groupAndTuple[1] = output.getField(output.arity()-1);
+    	if (output.arity() == 2){
+    		groupAndTuple[0] = output.getField(0);
+    		groupAndTuple[1] = output.getField(1);
+    	}else{
+    		Tuple group = new Tuple();
+    		for (int j=0; j<output.arity()-1; j++){
+    			group.appendField(output.getField(j));
     		}
-		}catch (IOException e){
-			throw new RuntimeException(e);
-		}
+    		groupAndTuple[0] = group;
+    		groupAndTuple[1] = output.getField(output.arity()-1);
+    	}
 		return groupAndTuple;
 	}
 	

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Thu Nov  1 13:48:16 2007
@@ -31,12 +31,14 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.physicalLayer.POMapreduce;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.PigLogger;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.UTF8;
@@ -89,8 +91,7 @@
      * @throws IOException
      */
     public boolean launchPig(POMapreduce pom) throws IOException {
-	      
-	Logger log = pom.pigContext.getLogger();
+		Logger log = PigLogger.getLogger();
         JobConf conf = new JobConf(pom.pigContext.getConf());
         conf.setJobName(pom.pigContext.getJobName());
         boolean success = false;
@@ -211,7 +212,8 @@
             	
             		// create an empty output file
                 	PigFile f = new PigFile(outputFile.toString(), false);
-                	f.store(new DataBag(), new PigStorage(), pom.pigContext);
+                	f.store(new DataBag(Datum.DataType.TUPLE),
+						new PigStorage(), pom.pigContext);
                 
             	}
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java Thu Nov  1 13:48:16 2007
@@ -71,7 +71,7 @@
                 
                 bags = new BigDataBag[inputCount];
                 for (int i = 0; i < inputCount; i++) {
-                    bags[i] = BagFactory.getInstance().getNewBigBag();
+                    bags[i] = BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE);
                 }
             }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java Thu Nov  1 13:48:16 2007
@@ -140,7 +140,8 @@
             Tuple t = new Tuple(1 + inputCount);
             t.setField(0, groupName);
             for (int i = 1; i < 1 + inputCount; i++) {
-                bags[i - 1] = BagFactory.getInstance().getNewBag();
+                bags[i - 1] =
+					BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE);
                 t.setField(i, bags[i - 1]);
             }
 



Mime
View raw message