pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r591143 [1/4] - in /incubator/pig/branches/types: src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/eval/...
Date Thu, 01 Nov 2007 20:48:22 GMT
Author: gates
Date: Thu Nov  1 13:48:16 2007
New Revision: 591143

URL: http://svn.apache.org/viewvc?rev=591143&view=rev
Log:
Initial checkin of types.  Added classes for atomic types, plus a number of
unit tests.  Adtapted existing code to changes in type structure, and to the
fact that a bag can now be a bag of anything, not just a bag of tuples.


Added:
    incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java
    incubator/pig/branches/types/src/org/apache/pig/data/DatumImpl.java
    incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataCharArrayNone.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataCharArrayUtf16.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataDouble.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataFloat.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataInteger.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataLong.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataMap.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataUnknown.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTuple.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/Main.java
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
    incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java
    incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
    incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java
    incubator/pig/branches/types/src/org/apache/pig/data/Datum.java
    incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
    incubator/pig/branches/types/src/org/apache/pig/data/Tuple.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/GFCross.java
    incubator/pig/branches/types/src/org/apache/pig/impl/builtin/ShellBagEvalFunc.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/EvalSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/FuncEvalSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/GenerateSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/MapLookupSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/ProjectSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/SortDistinctSpec.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/DataCollector.java
    incubator/pig/branches/types/src/org/apache/pig/impl/eval/collector/UnflattenCollector.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpString.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigFile.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigSplit.java
    incubator/pig/branches/types/test/org/apache/pig/test/Util.java

Modified: incubator/pig/branches/types/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/Main.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/Main.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/Main.java Thu Nov  1 13:48:16 2007
@@ -31,6 +31,7 @@
 import org.apache.log4j.PatternLayout;
 import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.PigLogger;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
@@ -155,7 +156,7 @@
 		LogicalPlanBuilder.classloader = pigContext.createCl(null);
 
 		// Set the log level, and set up appenders
-		Logger log = pigContext.getLogger();
+		Logger log = PigLogger.getLogger();
 		log.setLevel(logLevel);
 		ConsoleAppender screen = new ConsoleAppender(new PatternLayout());
 		if (verbose) screen.setThreshold(logLevel);

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Nov  1 13:48:16 2007
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.Datum;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -243,7 +244,12 @@
        		pp = physicalPlans.get(readFrom);
     	}
     	
-    	return pp.exec(continueFromLast).content();
+		// Data bags are guaranteed to contain tuples.
+    	//return pp.exec(continueFromLast).content();
+		// A direct subversion of the type system, this has to be bad.
+		Iterator<Datum> i = pp.exec(continueFromLast).content();
+		Object o = i;
+    	return (Iterator<Tuple>)o;
     	
     }
     
@@ -259,8 +265,10 @@
 
         readFrom.compile(queryResults);
         readFrom.exec();
-        if (pigContext.getExecType() == ExecType.LOCAL)
-            return readFrom.read().content();
+        if (pigContext.getExecType() == ExecType.LOCAL) {
+            Object o = readFrom.read().content();
+			return (Iterator<Tuple>)o;
+		}
         final LoadFunc p;
         
         try{

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Thu Nov  1 13:48:16 2007
@@ -87,8 +87,8 @@
     static protected double sum(Tuple input) throws IOException {
         DataBag values = input.getBagField(0);
         double sum = 0;
-        for (Iterator<Tuple> it = values.content(); it.hasNext();) {
-            Tuple t = it.next();
+        for (Iterator<Datum> it = values.content(); it.hasNext();) {
+            Tuple t = (Tuple)it.next();
             try {
                 sum += t.getAtomField(0).numval();
             } catch (NumberFormatException exp) {

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/DIFF.java Thu Nov  1 13:48:16 2007
@@ -24,6 +24,8 @@
 import org.apache.pig.data.DataAtom;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.AtomicDatum;
 
 
 /**
@@ -45,16 +47,16 @@
         if (input.arity() != 2) {
             throw new IOException("DIFF must compare two fields not " + input.arity());
         }
-        if (input.getField(0) instanceof DataBag) {
+		if (input.getField(0).getType() == Datum.DataType.BAG) {
             DataBag field1 = input.getBagField(0);
             DataBag field2 = input.getBagField(1);
-            Iterator<Tuple> it1 = field1.content();
+            Iterator<Datum> it1 = field1.content();
             checkInBag(field2, it1, output);
-            Iterator<Tuple> it2 = field2.content();
+            Iterator<Datum> it2 = field2.content();
             checkInBag(field1, it2, output);
         } else {
-            DataAtom d1 = input.getAtomField(0);
-            DataAtom d2 = input.getAtomField(1);
+            AtomicDatum d1 = input.getAtomField(0);
+            AtomicDatum d2 = input.getAtomField(1);
             if (!d1.equals(d2)) {
                 output.add(new Tuple(d1));
                 output.add(new Tuple(d2));
@@ -62,10 +64,10 @@
         }
     }
 
-    private void checkInBag(DataBag bag, Iterator<Tuple> iterator, DataBag emitTo) throws IOException {
+    private void checkInBag(DataBag bag, Iterator<Datum> iterator, DataBag emitTo) throws IOException {
         while(iterator.hasNext()) {
-            Tuple t = iterator.next();
-            Iterator<Tuple> it2 = bag.content();
+            Datum t = iterator.next();
+            Iterator<Datum> it2 = bag.content();
             boolean found = false;
             while(it2.hasNext()) {
                 if (t.equals(it2.next())) {

Modified: incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/BagFactory.java Thu Nov  1 13:48:16 2007
@@ -42,15 +42,15 @@
     }
     
     // Get BigBag or Bag, depending on whether the temp directory has been set up
-    public DataBag getNewBag() throws IOException {
-        if (tmpdir == null) return new DataBag();
-        else return getNewBigBag();
+    public DataBag getNewBag(Datum.DataType type) throws IOException {
+        if (tmpdir == null) return new DataBag(type);
+        else return getNewBigBag(type);
     }
     
     // Need a Big Bag, dammit!
-    public BigDataBag getNewBigBag() throws IOException {
+    public BigDataBag getNewBigBag(Datum.DataType type) throws IOException {
         if (tmpdir == null) throw new IOException("No temp directory given for BigDataBag.");
-        else return new BigDataBag(tmpdir);
+        else return new BigDataBag(type, tmpdir);
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/BigDataBag.java Thu Nov  1 13:48:16 2007
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -32,381 +32,405 @@
 
 
 public class BigDataBag extends DataBag {
-    
-	File tempdir;
-    LinkedList<File> stores = new LinkedList<File>();
-    DataBagFileWriter writer = null;
-    
-    boolean finishedAdds = false,wantSorting = false, doneSorting = false, sortInProgress = false, wroteUnsortedFile = false;
-    int trueCount = 0;
-
-    boolean eliminateDuplicates = false;
-    EvalSpec spec = null;
-    
-    public static long MAX_MEMORY = Runtime.getRuntime().maxMemory();
-    /**
-     * Sets the limit of remaining memory that will
-     * cause us to switch to disk backed mode
-     */
-    public static long FREE_MEMORY_TO_MAINTAIN = (long)(MAX_MEMORY*.25);
-    
-    
-    public BigDataBag(File tempdir) throws IOException {
-    	this.tempdir = tempdir;
-    }
-    
-    private boolean isMemoryAvailable(long memLimit){
-    	long freeMemory = Runtime.getRuntime().freeMemory();
+	
+File tempdir;
+LinkedList<File> stores = new LinkedList<File>();
+DataBagFileWriter writer = null;
+	
+boolean finishedAdds = false,wantSorting = false, doneSorting = false, sortInProgress = false, wroteUnsortedFile = false;
+long trueCount = 0;
+
+boolean eliminateDuplicates = false;
+EvalSpec spec = null;
+	
+public static long MAX_MEMORY = Runtime.getRuntime().maxMemory();
+/**
+ * Sets the limit of remaining memory that will
+ * cause us to switch to disk backed mode
+ */
+public static long FREE_MEMORY_TO_MAINTAIN = (long)(MAX_MEMORY*.25);
+	
+	
+public BigDataBag(Datum.DataType elementType, File tempdir) throws IOException
+{
+	super(elementType);
+	this.tempdir = tempdir;
+}
+	
+private boolean isMemoryAvailable(long memLimit)
+{
+	long freeMemory = Runtime.getRuntime().freeMemory();
 	long usedMemory = Runtime.getRuntime().totalMemory() - freeMemory;
-    	return MAX_MEMORY-usedMemory > memLimit;
-    }
-        
-    private void writeContentToDisk() throws IOException{
-    	
-    	if (writer==null){
-			File store = File.createTempFile("bag",".dat",tempdir);
-			stores.add(store);
-			writer = new DataBagFileWriter(store);
+	return MAX_MEMORY-usedMemory > memLimit;
+}
+		
+private void writeContentToDisk() throws IOException
+{
+	if (writer==null){
+		File store = File.createTempFile("bag",".dat",tempdir);
+		stores.add(store);
+		writer = new DataBagFileWriter(store);
+	}
+	if (wantSorting && !wroteUnsortedFile){
+		if (eliminateDuplicates)
+			super.distinct();
+		else
+			super.sort(spec);
+	}else{
+		wroteUnsortedFile = true;
+	}
+	writer.write(content());
+	super.clear();
+	if (wantSorting){
+		writer.close();
+		writer = null;
+	}
+		
+}
+	
+@Override
+public void add(Datum d)
+{
+	if (d==null)
+		return;
+	if (finishedAdds) {
+		throw new RuntimeException("DataBag has been read from");
+	}
+	try{
+		if (writer == null) {
+			//Want to add in memory
+			super.add(d);
+			if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && trueCount > 10) {
+				writeContentToDisk();
+			}	
+		}else{
+			writer.write(d);
 		}
-    	if (wantSorting && !wroteUnsortedFile){
-    		if (eliminateDuplicates)
-    			super.distinct();
-    		else
-    			super.sort(spec);
-    	}else{
-    		wroteUnsortedFile = true;
-    	}
-    	writer.write(content.iterator());
-    	super.clear();
-    	if (wantSorting){
-        	writer.close();
-        	writer = null;
-        }
-        
-    }
-    
-    @Override
-	public void add(Tuple t) {
-    	if (t==null)
-    		return;
-    	if (finishedAdds) {
-            throw new RuntimeException("DataBag has been read from");
-        }
-    	try{
-	        if (writer == null) {
-	        	//Want to add in memory
-	        	super.add(t);
-	            if (!isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN) && trueCount > 10) {
-	            	writeContentToDisk();
-	            }	
-	        }else{
-	        	writer.write(t);
-	        }
-	        trueCount++;
-        } catch(IOException e) {
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-    
-    
-    @Override
-	public int cardinality() {
-    	if (!wantSorting || !eliminateDuplicates || doneSorting)
-    		return trueCount;
-    	
-    	if (sortInProgress)
-			throw new RuntimeException("Can't ask for cardinality in the middle of a sort");
-    	
-		//Now ask for the content to set the count right
-		Iterator<Tuple> iter = content();
+		trueCount++;
+	} catch(IOException e) {
+		throw new RuntimeException(e.getMessage());
+	}
+}
+	
+	
+/**
+ * @deprecated Use size instead.
+ */
+@Override
+public int cardinality() { return (int)size(); }
+
+@Override
+public long size()
+{
+	if (!wantSorting || !eliminateDuplicates || doneSorting)
+		return trueCount;
+		
+	if (sortInProgress)
+		throw new RuntimeException("Can't ask for cardinality in the middle of a sort");
+		
+	//Now ask for the content to set the count right
+	Iterator<Datum> iter = content();
+		
+	if (sortInProgress){
+		//Must go through the entire iterator to set the count right
+		while(iter.hasNext())
+			iter.next();
+	}
 		
-		if (sortInProgress){
-			//Must go through the entire iterator to set the count right
+	return trueCount;
+}
+	
+private void createSortedRuns() throws IOException
+{
+	DataBagFileReader reader = new DataBagFileReader(stores.removeFirst());
+	Iterator<Datum> iter = reader.content();
+	while(iter.hasNext()){
+		DataBag bag = new DataBag(bagOf());
+		while( iter.hasNext() && isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2)){
+			bag.add(iter.next());
+		}
+		if(eliminateDuplicates){
+			bag.distinct();
+			trueCount = bag.size();
+		}else
+			bag.sort(spec);
+		File f = File.createTempFile("bag", ".dat",tempdir);
+		stores.add(f);
+		DataBagFileWriter writer = new DataBagFileWriter(f);
+		writer.write(bag.content());
+		bag.clear();
+		writer.close();
+	}
+	reader.clear();
+}
+	
+private Iterator<Datum> doSorting() throws IOException{
+	if (wroteUnsortedFile){
+		createSortedRuns();
+	}
+	
+	if (stores.size()==1){
+		doneSorting = true;
+		return new DataBagFileReader(stores.peek()).content();
+	}
+	
+	sortInProgress = true;
+	while (true){
+		Iterator<Datum> iter = new FileMerger();
+		
+		if (stores.size() > 1){
 			while(iter.hasNext())
 				iter.next();
+		}else{
+			return iter;
 		}
+	}
+}
+	
+@Override
+public Iterator<Datum> content()
+{
+	if (sortInProgress)
+		throw new RuntimeException("Cannot open another iterator: a sort is in progress");
 		
-		return trueCount;
-    }
-    
-    private void createSortedRuns() throws IOException{
-    	DataBagFileReader reader = new DataBagFileReader(stores.removeFirst());
-    	Iterator<Tuple> iter = reader.content();
-    	while(iter.hasNext()){
-    		DataBag bag = new DataBag();
-    		while( iter.hasNext() && isMemoryAvailable(FREE_MEMORY_TO_MAINTAIN/2)){
-    			bag.add(iter.next());
-    		}
-    		if(eliminateDuplicates){
-    			bag.distinct();
-    			trueCount = bag.cardinality();
-    		}else
-    			bag.sort(spec);
-    		File f = File.createTempFile("bag", ".dat",tempdir);
-    		stores.add(f);
-    		DataBagFileWriter writer = new DataBagFileWriter(f);
-    		writer.write(bag.content());
-    		bag.clear();
-    		writer.close();
-    	}
-    	reader.clear();
-    }
-    
-    private Iterator<Tuple> doSorting() throws IOException{
-    	if (wroteUnsortedFile){
-    		createSortedRuns();
-    	}
-    	
-    	if (stores.size()==1){
-    		doneSorting = true;
-    		return new DataBagFileReader(stores.peek()).content();
-    	}
-    	
-    	sortInProgress = true;
-    	while (true){
-    		Iterator<Tuple> iter = new FileMerger();
-    		
-    		if (stores.size() > 1){
-    			while(iter.hasNext())
-    				iter.next();
-    		}else{
-    			return iter;
-    		}
-    	}
-    }
-    
-    @Override
-	public Iterator<Tuple> content() {
-    	
-    	if (sortInProgress)
-    		throw new RuntimeException("Cannot open another iterator: a sort is in progress");
-    	
-    	finishedAdds = true;
-    	
-    	//memory only case
-    	if (stores.isEmpty()){
-    		if (wantSorting && !doneSorting){
-    			if (eliminateDuplicates){
-    				super.distinct();
-    				trueCount = super.cardinality();
-    			}
-    			else
-    				super.sort(spec);
-    			doneSorting = true;
-    		}
-    		return super.content(); 
-    	}
-    	
-    	//disk case
-    	try{
-	    	//first flush all remaining contents to disk too, and close any open files
-	    	if (!content.isEmpty())
-	    		writeContentToDisk();
-	    	
-	    	close();
-	    	
-	    	//Now if not already sorted, sort the contents and return the iterator on 
-	    	//the merged file
-	    	if (wantSorting && !doneSorting){
-	    		return doSorting();
-	    	}
-	    	
-	    	//the list stores should be of length 1 at this time
-	    	//because sorting always leaves it so
-	    	//else just return the iterator on the singelton store file
-	    	return new DataBagFileReader(stores.peek()).content();
-    	}catch(IOException e){
-    		throw new RuntimeException(e.getMessage());
-    	}
-    	
-    }
-    
-    @Override
-	public boolean isEmpty() {
-        return trueCount == 0;
-    }
-    
-    @Override
-	public void remove(Tuple d) {
-        throw new RuntimeException("BigDataBag is append only");
-    }
-    
-    public void close(){
-    	if (writer != null){
-            try {
-                writer.close();
-                writer = null;
-            } catch (IOException e) {
-                RuntimeException ne = new RuntimeException(e.getMessage());
-                ne.setStackTrace(e.getStackTrace());
-                throw ne;
-            }
-        }
-    }
-    
-    @Override
-	public void clear() {
-        close();
-        while(!stores.isEmpty())
-        	stores.removeFirst().delete();
-
-        finishedAdds = false;
-        trueCount = 0;
-        wantSorting = false; doneSorting = false; sortInProgress = false; wroteUnsortedFile = false;
-        super.clear();
-    }
-        
-    private class HeapEntry{
-    	DataBagFileReader reader;
-    	Iterator<Tuple> iter;
-    	Tuple	tuple;
-
-    	public HeapEntry(DataBagFileReader reader, Iterator<Tuple> iter, Tuple tuple) {
-    		this.reader = reader;
-    		this.iter = iter;
-    		this.tuple = tuple;
+	finishedAdds = true;
+		
+	//memory only case
+	if (stores.isEmpty()){
+		if (wantSorting && !doneSorting){
+			if (eliminateDuplicates){
+				super.distinct();
+				trueCount = super.size();
+			}
+			else
+				super.sort(spec);
+			doneSorting = true;
 		}
-    	
-    }
-    
-    private class FileMerger implements Iterator<Tuple>{
-    	PriorityQueue<HeapEntry> heap;
-    	private final int FANIN_LIMIT = 25;
-    	DataBagFileWriter writer;
-    	HeapEntry nextEntry;
-    	
-    	public FileMerger() throws IOException{
-        	
-    		Comparator<HeapEntry> comp = new Comparator<HeapEntry>(){
-        		public int compare(HeapEntry he1, HeapEntry he2){
-				try
-				{
-        				return spec.getComparator().compare(he1.tuple, he2.tuple);
-				}
-				catch (RuntimeException e)
-				{
-					String msg = "spec = " + spec.toString() + "\n he1 = ";
-					msg +=	he1.tuple.toString();
-					msg += "\n hev2 = ";
-					msg += he2.tuple.toString();	  
-
-					throw new RuntimeException(e.getMessage() + ", additional info: " + msg);
-				}
-        		}
-        	};
-        	
-        	heap = new PriorityQueue<HeapEntry>(10,comp);
-    		
-        	for (int i=0; i < FANIN_LIMIT && !stores.isEmpty(); i++){
-        		DataBagFileReader reader = new DataBagFileReader(stores.removeFirst());
-        		Iterator<Tuple> iter = reader.content();
-        		if (iter.hasNext()){
-        			heap.add(new HeapEntry(reader,iter,iter.next()));
-        		}else{
-        			reader.clear();
-        		}
-        	}
-        	
-        	File outputFile  = File.createTempFile("bag",".dat",tempdir);
-        	stores.add(outputFile);
-        	writer = new DataBagFileWriter(outputFile);
-        	
-        	getNextEntry();
-        	if (eliminateDuplicates)
-        		trueCount = 0;
-    	}
-    	
-    	private void getNextEntry() throws IOException{
-    		if (heap.isEmpty()){
-        		nextEntry = null;
-        		writer.close();
-        		if (stores.size()==1){
-        			sortInProgress = false;
-        			doneSorting = true;
-        		}
-        		return;
-        	}else{
-        		nextEntry = heap.poll();
-        		Iterator<Tuple> iter = nextEntry.iter;
-        		if(iter.hasNext()){
-        			heap.add(new HeapEntry(nextEntry.reader,iter,iter.next()));
-        		}else{
-        			nextEntry.reader.clear();
-        		}
-        	}
-    	}
-    	
-    	public boolean hasNext(){
-    		return nextEntry != null;
-    	}
-    	
-    	public Tuple next(){
-	    	HeapEntry prevEntry = nextEntry;
-	    	try{
-	    		writer.write(prevEntry.tuple);
-	    		if (eliminateDuplicates)
-	    			trueCount++;
-	    		do{
-	    			getNextEntry();
-	    		}while(nextEntry!=null && eliminateDuplicates && spec.getComparator().compare(prevEntry.tuple, nextEntry.tuple)==0);    		
-	    		
-    		}catch(IOException e){
-    			RuntimeException re = new RuntimeException(e.getMessage());
-    			re.setStackTrace(e.getStackTrace());
-    			throw re;
-    		}
-    		return prevEntry.tuple;
-    	}
-    	
-    	public void remove(){
-    		throw new RuntimeException("Read only cursor");
-    	}
-    	
-    }
-    
-    private void sort(EvalSpec spec, boolean eliminateDuplicates){
-    	if (wantSorting)
-    		throw new RuntimeException("Can't request sorting again");
-    	if (trueCount > 0){
-    		//This is as good as starting to read, since we want to allow
-    		//sort specifications only in the beginning or the end
-    		finishedAdds = true;
-    	}
-    		
-    	wantSorting = true;
-    	this.spec = spec;
-    	this.eliminateDuplicates = eliminateDuplicates;
-    }
-    
-    @Override
-	public void sort() {
-        sort(new StarSpec(),false);
-        isSorted = true;
-    }
-    
-    
-    @Override
-	public void sort(EvalSpec spec) {
-    	sort(spec,false);
-    	isSorted = true;
-    }
-    
-    @Override
-	public void arrange(EvalSpec spec) {
-        sort(spec,false);
-        isSorted = true;
-    }
-    
-    @Override
-	public void distinct() {
-    	sort(null,true);
-    	isSorted = true;
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-    	clear();
-    	super.finalize();
-    }
-    
+		return super.content(); 
+	}
+	
+	//disk case
+	try{
+		//first flush all remaining contents to disk too, and close any open files
+		if (!isEmpty()) writeContentToDisk();
+		
+		close();
+		
+		//Now if not already sorted, sort the contents and return the iterator on 
+		//the merged file
+		if (wantSorting && !doneSorting){
+			return doSorting();
+		}
+		
+		//the list stores should be of length 1 at this time
+		//because sorting always leaves it so
+		//else just return the iterator on the singelton store file
+		return new DataBagFileReader(stores.peek()).content();
+	}catch(IOException e){
+		throw new RuntimeException(e.getMessage());
+	}
+		
+}
+	
+@Override
+public boolean isEmpty() 
+{
+	return trueCount == 0;
+}
+	
+@Override
+public void remove(Datum d)
+{
+	throw new RuntimeException("BigDataBag is append only");
+}
+	
+public void close()
+{
+	if (writer != null){
+		try {
+			writer.close();
+			writer = null;
+		} catch (IOException e) {
+			RuntimeException ne = new RuntimeException(e.getMessage());
+			ne.setStackTrace(e.getStackTrace());
+			throw ne;
+		}
+	}
+}
+	
+@Override
+public void clear() {
+	close();
+	while(!stores.isEmpty())
+		stores.removeFirst().delete();
+
+	finishedAdds = false;
+	trueCount = 0;
+	wantSorting = false; doneSorting = false; sortInProgress = false; wroteUnsortedFile = false;
+	super.clear();
+}
+		
+private class HeapEntry
+{
+	DataBagFileReader reader;
+	Iterator<Datum> iter;
+	Datum	datum;
+
+public HeapEntry(DataBagFileReader reader, Iterator<Datum> iter, Datum datum)
+{
+	this.reader = reader;
+	this.iter = iter;
+	this.datum = datum;
+	}
+	
+}
+	
+private class FileMerger implements Iterator<Datum>
+{
+	PriorityQueue<HeapEntry> heap;
+	private final int FANIN_LIMIT = 25;
+	DataBagFileWriter writer;
+	HeapEntry nextEntry;
+		
+public FileMerger() throws IOException
+{
+	Comparator<HeapEntry> comp = new Comparator<HeapEntry>(){
+
+		public int compare(HeapEntry he1, HeapEntry he2) {
+			try {
+				return spec.getComparator().compare(he1.datum, he2.datum);
+			} catch (RuntimeException e) {
+				String msg = "spec = " + spec.toString() + "\n he1 = ";
+				msg +=	he1.datum.toString();
+				msg += "\n hev2 = ";
+				msg += he2.datum.toString();	  
+		
+				throw new RuntimeException(e.getMessage() + ", additional info: " + msg);
+			}
+		}
+	};
+			
+	heap = new PriorityQueue<HeapEntry>(10,comp);
+			
+	for (int i=0; i < FANIN_LIMIT && !stores.isEmpty(); i++){
+		DataBagFileReader reader = new DataBagFileReader(stores.removeFirst());
+		Iterator<Datum> iter = reader.content();
+		if (iter.hasNext()){
+			heap.add(new HeapEntry(reader,iter,iter.next()));
+		}else{
+			reader.clear();
+		}
+	}
+			
+	File outputFile  = File.createTempFile("bag",".dat",tempdir);
+	stores.add(outputFile);
+	writer = new DataBagFileWriter(outputFile);
+			
+	getNextEntry();
+	if (eliminateDuplicates)
+		trueCount = 0;
+}
+		
+private void getNextEntry() throws IOException
+{
+	if (heap.isEmpty()){
+		nextEntry = null;
+		writer.close();
+		if (stores.size()==1){
+			sortInProgress = false;
+			doneSorting = true;
+		}
+		return;
+	}else{
+		nextEntry = heap.poll();
+		Iterator<Datum> iter = nextEntry.iter;
+		if(iter.hasNext()){
+			heap.add(new HeapEntry(nextEntry.reader,iter,iter.next()));
+		}else{
+			nextEntry.reader.clear();
+		}
+	}
+}
+		
+public boolean hasNext()
+{
+	return nextEntry != null;
+}
+		
+public Datum next()
+{
+	HeapEntry prevEntry = nextEntry;
+	try{
+		writer.write(prevEntry.datum);
+		if (eliminateDuplicates)
+			trueCount++;
+		do{
+			getNextEntry();
+		}while(nextEntry!=null && eliminateDuplicates && spec.getComparator().compare(prevEntry.datum, nextEntry.datum)==0);			
+				
+	}catch(IOException e){
+		RuntimeException re = new RuntimeException(e.getMessage());
+		re.setStackTrace(e.getStackTrace());
+		throw re;
+	}
+	return prevEntry.datum;
+}
+		
+public void remove()
+{
+	throw new RuntimeException("Read only cursor");
+}
+		
+}
+	
+private void sort(EvalSpec spec, boolean eliminateDuplicates)
+{
+	if (wantSorting)
+		throw new RuntimeException("Can't request sorting again");
+	if (trueCount > 0){
+		//This is as good as starting to read, since we want to allow
+		//sort specifications only in the beginning or the end
+		finishedAdds = true;
+	}
+			
+	wantSorting = true;
+	this.spec = spec;
+	this.eliminateDuplicates = eliminateDuplicates;
+}
+	
+@Override
+public void sort()
+{
+	sort(new StarSpec(),false);
+	mIsSorted = true;
+}
+	
+@Override
+public void sort(EvalSpec spec)
+{
+	sort(spec,false);
+	mIsSorted = true;
+}
+	
+@Override
+public void arrange(EvalSpec spec)
+{
+	sort(spec,false);
+	mIsSorted = true;
+}
+	
+@Override
+public void distinct()
+{
+	sort(null,true);
+	mIsSorted = true;
+}
+
+@Override
+protected void finalize() throws Throwable
+{
+	clear();
+	super.finalize();
+}
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataAtom.java Thu Nov  1 13:48:16 2007
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,128 +22,83 @@
 import java.io.IOException;
 
 /**
- * The basic data unit. For now, we represent all atomic data objects as strings
+ * @deprecated Use new data types instead.  This type is not optimized for
+ * performance.
  */
-final public class DataAtom extends Datum {
-    String stringVal = null;
-    Double doubleVal = null;
-    public static String EMPTY = "";
-    byte[] binaryVal = null;
-    
-    public DataAtom() {
-        stringVal = EMPTY;
-        doubleVal = Double.POSITIVE_INFINITY;
-    }
-
-    public DataAtom(String valIn) {
-        setValue(valIn);
-    }
-
-    public DataAtom(int valIn) {
-        setValue(valIn);
-    }
-
-    public DataAtom(long valIn) {
-        setValue(valIn);
-    }
-    
-    public DataAtom(byte[] valIn){
-    	setValue(valIn);
-    }
-
-    public DataAtom(double valIn) {
-        setValue(valIn);
-    }
-
-    public void setValue(String valIn) {
-        stringVal = valIn;
-        doubleVal = Double.POSITIVE_INFINITY;
-    }
-    
-    public void setValue(byte[] valIn) {
-    	binaryVal = valIn;
-    	stringVal = null;
-    	doubleVal = Double.POSITIVE_INFINITY;
-    }
-
-    public void setValue(int valIn) {
-        // conversion is cheap, do it now
-        doubleVal = new Double(valIn);
-        stringVal = Integer.toString(valIn);
-    }
-
-    public void setValue(long valIn) {
-        // conversion is cheap, do it now
-        doubleVal = new Double(valIn);
-        stringVal = Long.toString(valIn);
-    }
-
-    public void setValue(double valIn) {
-        // conversion is cheap, do it now
-        doubleVal = new Double(valIn);
-        stringVal = Double.toString(valIn);
-    }
-
-    public String strval() {
-        return stringVal;
-    }
-
-    public Double numval() {
-        // lazy parse and create the numeric member value
-        if (doubleVal == Double.POSITIVE_INFINITY) {
-            doubleVal = new Double(stringVal);
-        }
-        return doubleVal;
-    }
-
-    @Override
-	public String toString() {
-        return stringVal;
-    }
-
-    
-    @Override
-	public boolean equals(Object other) {
-        
-    	return compareTo(other) == 0;
-    }    
-    
-    public int compareTo(Object other) {
-    	if (!(other instanceof DataAtom))
-    		return -1;
-        DataAtom dOther = (DataAtom) other;
-        
-        return stringVal.compareTo(dOther.stringVal);
-            
-    }
-
-    @Override
-	public void write(DataOutput out) throws IOException {
-    	 out.write(ATOM);
-         byte[] data;
-         try {
-             data = strval().getBytes("UTF-8");
-         } catch (Exception e) {
-             long size = strval().length();
-             throw new RuntimeException("Error dealing with DataAtom of size " + size);
-         }
-         Tuple.encodeInt(out, data.length);
-         out.write(data);	
-    }
-    
-    static DataAtom read(DataInput in) throws IOException {
-        int len = Tuple.decodeInt(in);
-        DataAtom ret = new DataAtom();
-        byte[] data = new byte[len];
-        in.readFully(data);
-        ret.setValue(new String(data, "UTF-8"));
-        return ret;
-    }
-
-    
-    @Override
-	public int hashCode() {
-        return stringVal.hashCode();
-    }
+final public class DataAtom extends DataCharArrayUtf16 {
+	
+public DataAtom() { }
+
+public DataAtom(String valIn) { setValue(valIn); }
+
+public DataAtom(int valIn) { setValue(valIn); }
+
+public DataAtom(long valIn) { setValue(valIn); }
+	
+public DataAtom(byte[] valIn) { setValue(valIn); }
+
+public DataAtom(double valIn) { setValue(valIn); }
+
+public void setValue(String valIn) { mVal = valIn; }
+	
+public void setValue(byte[] valIn) { mVal = new String(valIn); }
+
+public void setValue(int valIn) { mVal = Integer.toString(valIn); }
+
+public void setValue(long valIn) { mVal = Long.toString(valIn); }
+
+public void setValue(double valIn) { mVal = Double.toString(valIn); }
+
+public String strval() { return mVal; }
+
+public Double numval() { return Double.parseDouble(mVal); }
+
+@Override
+public String toString()
+{
+	return mVal;
+}
+
+@Override
+public boolean equals(Object other)
+{
+	return compareTo(other) == 0;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof DataAtom)) return -1;
+	DataAtom dOther = (DataAtom) other;
+
+	return mVal.compareTo(dOther.mVal);
+}
+
+@Override
+public void write(DataOutput out) throws IOException
+{
+	out.write(Datum.DataType.ATOM.getMarker());
+	byte[] data;
+	try {
+		data = strval().getBytes("UTF-8");
+	} catch (Exception e) {
+		long size = strval().length();
+		throw new RuntimeException("Error dealing with DataAtom of size " + size);
+	}
+	out.writeInt(data.length);
+	out.write(data);
+}
+
+static DataAtom read(DataInput in) throws IOException
+{
+	int len = in.readInt();
+	DataAtom ret = new DataAtom();
+	byte[] data = new byte[len];
+	in.readFully(data);
+	ret.setValue(new String(data, "UTF-8"));
+	return ret;
+}
+
+@Override
+public int hashCode() { return mVal.hashCode(); }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java Thu Nov  1 13:48:16 2007
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,211 +30,306 @@
 
 
 /**
- * A collection of Tuples
+ * A collection of Data values of a given type.  For performance reasons
+ * types are not checked on add or read.
  */
-public class DataBag extends Datum{
-    protected List<Tuple> content;
-    protected boolean isSorted = false;
-    
-    public DataBag() {
-        content = new ArrayList<Tuple>();
-    }
-
-    public DataBag(List<Tuple> c) {
-        content = c;
-    }
-
-    public DataBag(Tuple t) {
-        content = new ArrayList<Tuple>();
-        content.add(t);
-    }
-
-    public int cardinality() {
-        return content.size();
-    }
-
-    public boolean isEmpty() {
-        return content.size() == 0;
-    }
-    
-    public int compareTo(Object other) {
-    	if (this == other)
-    		return 0;
-    	if (other instanceof DataAtom) return +1;
-        if (other instanceof Tuple) return -1;
-        if (other instanceof DataBag){
-        	DataBag bOther = (DataBag) other;
-	        if (this.cardinality() != bOther.cardinality()) {
-	            return (this.cardinality() - bOther.cardinality());
-	        }
-	        
-	        // same cardinality, so compare tuple by tuple ...
-	        if (!isSorted())
-	        	this.sort();
-	        if (!bOther.isSorted())
-	        	bOther.sort();
-	        
-	        Iterator<Tuple> thisIt = this.content();
-	        Iterator<Tuple> otherIt = bOther.content();
-	        while (thisIt.hasNext() && otherIt.hasNext()) {
-	            Tuple thisT = thisIt.next();
-	            Tuple otherT = otherIt.next();
-	            
-	            int c = thisT.compareTo(otherT);
-	            if (c != 0) return c;
-	        }
-	        
-	        return 0;   // if we got this far, they must be equal
-        }else{
-        	return -1;
-        }
-	        	
-    }
-    
-    @Override
-	public boolean equals(Object other) {
-        return (compareTo(other) == 0);
-    }
-    
-    public void sort() {
-        Collections.sort(content);
-    }
-    
-    public void sort(EvalSpec spec) {
-        Collections.sort(content, spec.getComparator());
-        isSorted = true;
-    }
-    
-    public void arrange(EvalSpec spec) {
-        sort(spec);
-        isSorted = true;
-    }
-    
-    public void distinct() {
-        
-        Collections.sort(content);
-        isSorted = true;
-        
-        Tuple lastTup = null;
-        for (Iterator<Tuple> it = content.iterator(); it.hasNext(); ) {
-            Tuple thisTup = it.next();
-            
-            if (lastTup == null) {
-                lastTup = thisTup;
-                continue;
-            }
-            
-            if (thisTup.compareTo(lastTup) == 0) {
-                it.remove();
-            } else {
-                lastTup = thisTup;
-            }
-        }
-    }
-
-    public Iterator<Tuple> content() {
-        return content.iterator();
-    }
-    
-
-    public void add(Tuple t) {
-        if (t!=null)
-        	content.add(t);
-    }
-
-    public void addAll(DataBag b) {
-        
-        Iterator<Tuple> it = b.content();
-        while (it.hasNext()) {
-            add(it.next());
-        }
-    }
-
-    public void remove(Tuple d) {
-        content.remove(d);
-    }
-
-    /**
-     * Returns the value of field i. Since there may be more than one tuple in the bag, this
-     * function throws an exception if it is not the case that all tuples agree on this field
-     */
-    public DataAtom getField(int i) throws IOException {
-        DataAtom val = null;
-
-        for (Iterator<Tuple> it = content(); it.hasNext();) {
-            DataAtom currentVal = it.next().getAtomField(i);
-
-            if (val == null) {
-                val = currentVal;
-            } else {
-                if (!val.strval().equals(currentVal.strval()))
-                    throw new IOException("Cannot call getField on a databag unless all tuples agree.");
-            }
-        }
-
-        if (val == null)
-            throw new IOException("Cannot call getField on an empty databag.");
-
-        return val;
-    }
-
-    public void clear(){
-    	content.clear();
-    	isSorted = false;
-    }
-
-    
-    @Override
-	public void write(DataOutput out) throws IOException {
-    	 out.write(BAG);
-         Tuple.encodeInt(out, cardinality());
-         Iterator<Tuple> it = content();
-         while (it.hasNext()) {
-             Tuple item = it.next();
-             item.write(out);
-         }	
-    }
-    
-    public static abstract class BagDelimiterTuple extends Tuple{}
-    public static class StartBag extends BagDelimiterTuple{}
-    
-    public static class EndBag extends BagDelimiterTuple{}
-    
-    public static final Tuple startBag = new StartBag();
-    public static final Tuple endBag = new EndBag();
-    
-    static DataBag read(DataInput in) throws IOException {
-        int size = Tuple.decodeInt(in);
-        DataBag ret = BagFactory.getInstance().getNewBag();
-        
-        for (int i = 0; i < size; i++) {
-            Tuple t = new Tuple();
-            t.readFields(in);
-            ret.add(t);
-        }
-        return ret;
-    }
-    
-    public void markStale(boolean stale){}
-    
-    @Override
-	public String toString() {
-        StringBuffer sb = new StringBuffer();
-        sb.append('{');
-        Iterator<Tuple> it = content();
-        while ( it.hasNext() ) {
-        	Tuple t = it.next();
-        	String s = t.toString();
-        	sb.append(s);
-            if (it.hasNext())
-                sb.append(", ");
-        }
-        sb.append('}');
-        return sb.toString();
-    }
-    
-    public boolean isSorted(){
-    	return isSorted;
-    }
-    
+public class DataBag extends ComplexDatum {
+	
+/**
+ * Create an empty bag of the indicated type.
+ * @param elementType what this will be a bag of.
+ */
+public DataBag(Datum.DataType elementType)
+{
+	mContent = new ArrayList<Datum>();
+	mElementType = elementType;
+}
+
+/**
+ * Create a bag based on a list.  The element type will be taken from the
+ * first Datum in the list.
+ * @param c list of data to place in the bag.  This list cannot be empty.
+ * @throws IOException if the list is empty.
+ */
+public DataBag(List<Datum> c) throws IOException
+{
+	mContent = c;
+	if (c.size() == 0) {
+		throw new IOException("Attempt to instantiate empty bag with no type.");
+	}
+	mElementType = c.get(0).getType();
+}
+
+/**
+ * Create a bag given a single datum.  The element type will be taken from
+ * the datum.
+ * @param t The datum to use.  The datum must not be java null (pig null
+ * is ok).
+ * @throws IOException if the datum is null.
+ */
+public DataBag(Datum t) throws IOException
+{
+	if (t == null) {
+		throw new IOException("Attempt to instantiate empty bag with no type.");
+	}
+	mContent = new ArrayList<Datum>();
+	mContent.add(t);
+	mElementType = t.getType();
+}
+
+/**
+ * @return BAG
+ */
+public DataType getType() { return Datum.DataType.BAG; }
+
+/**
+ * Find out what this is a bag of.
+ * @return datatype
+ */
+public DataType bagOf() { return mElementType; }
+
+public long size() { return mContent.size(); }
+
+/**
+ * @deprecated Use size() instead.
+ */
+public int cardinality() { return (int)size(); }
+
+/**
+ * Checks if the size of the bag is empty.
+ */
+public boolean isEmpty() { return mContent.size() == 0; }
+	
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.BAG) return crossTypeCompare(od);
+
+	DataBag bag = (DataBag)od;
+
+	Datum.DataType dt = bagOf();
+	Datum.DataType dto = bag.bagOf();
+	if (dt != dto) return dt.compareTo(dto);
+
+	long sz = size();
+	long tsz = bag.size();
+	if (sz < tsz) return -1;
+	else if (sz > tsz) return 1;
+
+	Iterator<Datum> i = content();
+	Iterator<Datum> j = bag.content();
+	while (i.hasNext()) {
+		Datum us = i.next();
+		Datum them = j.next();
+		int rc = us.compareTo(them);
+		if (rc != 0) return rc;
+	}
+
+	return 0;
+}
+	
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+@Override
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataBag)) return false;
+
+	DataBag bag = (DataBag)other;
+
+	long sz = size();
+
+	if (bagOf() != bag.bagOf()) return false;
+	if (bag.size() != sz) return false;
+
+	Iterator<Datum> i = content();
+	Iterator<Datum> j = bag.content();
+	while (i.hasNext()) {
+		Datum us = i.next();
+		Datum them = j.next();
+		if (!us.equals(them)) return false;
+	}
+
+	return true;
+}
+	
+public void sort()
+{
+	Collections.sort(mContent);
+	mIsSorted = true;
+}
+	
+public void sort(EvalSpec spec)
+{
+	Collections.sort(mContent, spec.getComparator());
+	mIsSorted = true;
+}
+	
+public void arrange(EvalSpec spec)
+{
+	sort(spec);
+	mIsSorted = true;
+}
+	
+public void distinct()
+{
+	// ARG!!!! We're sorting the whole thing and then doing a distinct.  Need to
+	// change this to do distinct during sort.
+	Collections.sort(mContent);
+	mIsSorted = true;
+		
+	Tuple lastTup = null;
+	for (Iterator<Datum> it = mContent.iterator(); it.hasNext(); ) {
+		Tuple thisTup = (Tuple)it.next();
+			
+		if (lastTup == null) {
+			lastTup = thisTup;
+			continue;
+		}
+			
+		if (thisTup.compareTo(lastTup) == 0) {
+			it.remove();
+		} else {
+			lastTup = thisTup;
+		}
+	}
+}
+
+/**
+ * Get an iterator to the contents of the bag.  The iterator is an
+ * iterator of Datum.  If something else is expected the caller will have to
+ * cast it.
+ */
+public Iterator<Datum> content() { return mContent.iterator(); }
+
+/**
+ * Add a datum to the bag.  The datatype of the datum should match the
+ * result of bagOf(), but that will not be checked in the interest of
+ * speed.  Would like this method to be final, but BigDataBag overrides it.
+ */
+public void add(Datum e)
+{
+	if (e != null) mContent.add(e);
+}
+
+/**
+ * Add the contents of a bag to the bag.  The datatype of the data should match the
+ * result of bagOf(), but that will not be checked in the interest of
+ * speed.
+ */
+public final void addAll(DataBag b)
+{
+	Iterator<Datum> it = b.content();
+	while (it.hasNext()) {
+		add(it.next());
+	}
+}
+
+/**
+ * Remove a particular datum from the bag.  This operation will be slow
+ * and should not be used much.
+ */
+public void remove(Datum d) { mContent.remove(d); }
+
+/**
+ * Returns the value of field i. Since there may be more than one tuple in the bag, this
+ * function throws an exception if it is not the case that all tuples agree on this field
+ */
+/*
+public DataAtom getField(int i) throws IOException
+{
+	DataAtom val = null;
+
+	for (Iterator<Tuple> it = mContent(); it.hasNext();) {
+		DataAtom currentVal = it.next().getAtomField(i);
+
+		if (val == null) {
+			val = currentVal;
+		} else {
+			if (!val.strval().equals(currentVal.strval()))
+				throw new IOException("Cannot call getField on a databag unless all tuples agree.");
+		}
+	}
+
+	if (val == null)
+		throw new IOException("Cannot call getField on an empty databag.");
+
+	return val;
+}
+*/
+
+/**
+ * Empty the bag of its contents.  It retains they type of bag it is.
+ */
+public void clear()
+{
+	mContent.clear();
+	mIsSorted = false;
+}
+	
+@Override
+public void write(DataOutput out) throws IOException
+{
+	 out.write(Datum.DataType.BAG.getMarker());
+	 // Now write out the element type, so the reader knows what kind of bag to
+	 // instantiate.
+	 out.write(mElementType.getMarker());
+	 out.writeLong(size());
+	 Iterator<Datum> it = content();
+	 while (it.hasNext()) {
+		 Datum item = it.next();
+		 item.write(out);
+	 }	
+}
+	
+public static abstract class BagDelimiterTuple extends Tuple{}
+public static class StartBag extends BagDelimiterTuple{}
+	
+public static class EndBag extends BagDelimiterTuple{}
+	
+public static final Tuple startBag = new StartBag();
+public static final Tuple endBag = new EndBag();
+	
+static DataBag read(DataInput in) throws IOException
+{
+	DataType etype = Datum.DataType.markerToType(in.readByte());
+
+	long size = in.readLong();
+	DataBag ret = new DataBag(etype);
+	// TODO
+	//DataBag ret = BagFactory.getInstance().getNewBag();
+		
+	for (int i = 0; i < size; i++) {
+		ret.add(DatumImpl.readDatum(in));
+	}
+	return ret;
+}
+	
+public void markStale(boolean stale){}
+	
+@Override
+public String toString() {
+	StringBuffer sb = new StringBuffer();
+	sb.append('{');
+	Iterator<Datum> it = content();
+	while ( it.hasNext() ) {
+		Datum t = it.next();
+		String s = t.toString();
+		sb.append(s);
+		if (it.hasNext())
+			sb.append(", ");
+	}
+	sb.append('}');
+	return sb.toString();
+}
+	
+public boolean mIsSorted(){ return mIsSorted; }
+	
+protected boolean mIsSorted = false;
+protected List<Datum> mContent;
+protected Datum.DataType mElementType;
+
 }

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataCharArray.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates an character array (string) datum.
+ */
+
+public abstract class DataCharArray extends AtomicDatum {
+
+public enum Encoding { UTF16, NONE };
+
+public static final byte ENC_UTF16 = 0x1;
+public static final byte ENC_NONE  = 0x2;
+
+public DataCharArray(Encoding enc) { mEncoding = enc; }
+
+public DataType getType() { return Datum.DataType.CHARARRAY; }
+
+public Encoding getEncoding() { return mEncoding; }
+
+static DataCharArray read(DataInput in) throws IOException
+{
+	// Assumes that the float indicator has already been read in order to
+	// select his function.
+	// Read encoding and create appropriate type.
+	byte[] b = new byte[1];
+	in.readFully(b);
+	switch (b[0]) {
+	case ENC_UTF16:
+		return DataCharArrayUtf16.read(in);
+
+	case ENC_NONE:
+		return DataCharArrayNone.read(in);
+
+	default:
+		throw new AssertionError("Unknown encoding type " + b[0]);
+	}
+}
+
+private Encoding mEncoding;
+
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayNone.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates a character array (string) datum with no encoding.
+ */
+
+public class DataCharArrayNone extends DataCharArray {
+
+public DataCharArrayNone()
+{
+	super(DataCharArray.Encoding.NONE);
+	mVal = null;
+}
+
+/**
+ * Construct a new datum using this byte[].  Makes a copy of the byte[].
+ */
+public DataCharArrayNone(byte[] b)
+{
+	super(DataCharArray.Encoding.NONE);
+	mVal = new byte[b.length];
+	for (int i = 0; i < b.length; i++) mVal[i] = b[i];
+}
+
+public long size()
+{
+	if (mVal == null) return 0;
+	else return mVal.length;
+}
+
+/**
+ * Get the value of the datum, as a byte[]
+ * @return byte[] value
+ */
+public final byte[] get() { return mVal; }
+
+/**
+ * Set the value of the datum.  Does not make a copy of the byte[], just
+ * stores a reference to it.
+ * @param val Value to be set.
+ */
+public final void set(byte[] val) { mVal = val; }
+
+public String toString()
+{
+	return new String(mVal);
+}
+
+public int hashCode()
+{
+	int hash = 0;
+	for (int i = 0; i < mVal.length; i++) {
+		hash += mVal[0] * (31 ^ (mVal.length - i - 1));
+	}
+	return hash;
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataCharArrayNone)) return false;
+	DataCharArrayNone o = (DataCharArrayNone)other;
+	if (mVal.length != o.mVal.length) return false;
+	boolean same = true;
+	for (int i = 0; i < mVal.length && same; i++) {
+		same &= mVal[i] == o.mVal[i];
+	}
+	return same;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.CHARARRAY) return crossTypeCompare(od);
+
+	DataCharArray dco = (DataCharArray)od;
+
+	if (dco.getEncoding() != getEncoding()) {
+		return getEncoding().compareTo(dco.getEncoding());
+	}
+
+	DataCharArrayNone noneOther = (DataCharArrayNone)dco;
+
+	int i;
+	for (i = 0; i < mVal.length && i < noneOther.mVal.length; i++) {
+		if (mVal[i] < noneOther.mVal[i]) return -1;
+		else if (mVal[i] > noneOther.mVal[i]) return 1;
+	}
+
+	// Ran out of the other before we ran out of us.
+	if (i < mVal.length) return 1;
+	// Ran out of us before we ran out of the other.
+	else if (i < noneOther.mVal.length) return -1;
+	else return 0;
+}
+
+public void write(DataOutput out) throws IOException
+{
+	out.write(Datum.DataType.CHARARRAY.getMarker());
+	out.write(ENC_NONE);
+	out.writeInt(mVal.length);
+	out.write(mVal);	
+}
+
+static DataCharArrayNone read(DataInput in) throws IOException
+{
+	// Assumes that the chararray and encoding indicators have already
+	// been read in order to select his function.
+	int len = in.readInt();
+	DataCharArrayNone ret = new DataCharArrayNone();
+	byte[] data = new byte[len];
+	in.readFully(data);
+	ret.mVal = data;
+	return ret;
+}
+ 
+private byte[] mVal;
+
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataCharArrayUtf16.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates an character array (string) datum.
+ */
+
+public class DataCharArrayUtf16 extends DataCharArray {
+
+public DataCharArrayUtf16()
+{
+	super(DataCharArray.Encoding.UTF16);
+	mVal = null;
+}
+
+/**
+ * Construct a new Datum using this string.  Makes a copy of the string.
+ */
+public DataCharArrayUtf16(String s)
+{
+	super(DataCharArray.Encoding.UTF16);
+	mVal = new String(s);
+}
+
+/**
+ * Construct a new Datum using these bytes.  Makes a new string out of the
+ * byte[] passed in.
+ */
+public DataCharArrayUtf16(byte[] b)
+{
+	super(DataCharArray.Encoding.UTF16);
+	mVal = new String(b);
+}
+
+public long size()
+{
+	if (mVal == null) return 0;
+	else return mVal.length();
+}
+
+/**
+ * Get the value of the datum, as a String
+ * @return String value
+ */
+public final String get() { return mVal; }
+
+/**
+ * Set the value of the datum.  Does not make a copy of the string, just
+ * stores a reference to it.
+ * @param val Value to be set.
+ */
+public final void set(String val) { mVal = val; }
+
+public String toString()
+{
+	return new String(mVal);
+}
+
+public int hashCode()
+{
+	return mVal.hashCode();
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataCharArrayUtf16)) return false;
+	DataCharArrayUtf16 o = (DataCharArrayUtf16)other;
+	return mVal.equals(o.mVal);
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.CHARARRAY) return crossTypeCompare(od);
+
+	DataCharArray dco = (DataCharArray)od;
+
+	if (dco.getEncoding() != getEncoding()) {
+		return getEncoding().compareTo(dco.getEncoding());
+	}
+
+	DataCharArrayUtf16 utf16Other = (DataCharArrayUtf16)dco;
+
+	return mVal.compareTo(utf16Other.mVal);
+}
+
+public void write(DataOutput out) throws IOException
+{
+	// I'm not sure if it's faster to do the in-memory translations from
+	// utf16->utf8 and back or store double the bytes while leaving it
+	// utf16.  But if we store it as utf16 it will be impossible to read
+	// with a basic editor, and make debugging that much more painful.
+	out.write(Datum.DataType.CHARARRAY.getMarker());
+	out.write(ENC_UTF16);
+	byte[] data;
+	try {
+		data = mVal.getBytes("UTF-8");
+	} catch (Exception e) {
+		long size = mVal.length();
+		throw new RuntimeException("Error dealing with DataAtom of size " + size);
+	}
+	out.writeInt(data.length);
+	out.write(data);	
+}
+
+static DataCharArrayUtf16 read(DataInput in) throws IOException
+{
+	// Assumes that the chararray and encoding indicators have already
+	// been read in order to select his function.
+	int len = in.readInt();
+	DataCharArrayUtf16 ret = new DataCharArrayUtf16();
+	byte[] data = new byte[len];
+	in.readFully(data);
+	ret.mVal = new String(data, "UTF-8");
+	return ret;
+}
+ 
+// Protected so DataAtom can read it.
+protected String mVal;
+
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataDouble.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates an double datum.
+ */
+
+public class DataDouble extends AtomicDatum {
+
+public DataDouble() { mVal = 0; }
+
+public DataDouble(double val) { mVal = val; }
+
+public DataType getType() { return Datum.DataType.DOUBLE; }
+
+public long size() { return 4L; }
+
+/**
+ * Get the value of the datum, as a double.
+ * @return double value
+ */
+public final double get() { return mVal; }
+
+/**
+ * Set the value of the datum.
+ * @param val Value to be set.
+ */
+public final void set(double val) { mVal = val; }
+
+public String toString()
+{
+	return String.valueOf(mVal);
+}
+
+public int hashCode()
+{
+	Double dd = new Double(mVal);
+	return dd.hashCode();
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataDouble)) return false;
+	DataDouble o = (DataDouble)other;
+	return mVal == o.mVal;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.DOUBLE) return crossTypeCompare(od);
+
+	DataDouble d = (DataDouble)od;
+
+	if (mVal < d.mVal) return -1;
+	else if (mVal > d.mVal) return 1;
+	else return 0;
+}
+
+public void write(DataOutput out) throws IOException 
+{
+	out.write(Datum.DataType.DOUBLE.getMarker());
+	out.writeDouble(mVal);	
+}
+
+static DataDouble read(DataInput in) throws IOException
+{
+	// Assumes that the double indicator has already been read in order to
+	// select his function.
+	return new DataDouble(in.readDouble());
+}
+ 
+private double mVal;
+
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataFloat.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates an float datum.
+ */
+
+public class DataFloat extends AtomicDatum {
+
+public DataFloat() { mVal = 0; }
+
+public DataFloat(float val) { mVal = val; }
+
+public DataType getType() { return Datum.DataType.FLOAT; }
+
+public long size() { return 4L; }
+
+/**
+ * Get the value of the datum, as a float.
+ * @return float value
+ */
+public final float get() { return mVal; }
+
+/**
+ * Set the value of the datum.
+ * @param val Value to be set.
+ */
+public final void set(float val) { mVal = val; }
+
+public String toString()
+{
+	return String.valueOf(mVal);
+}
+
+public int hashCode()
+{
+	return (int)Double.doubleToLongBits(mVal);
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataFloat)) return false;
+	DataFloat o = (DataFloat)other;
+	return mVal == o.mVal;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.FLOAT) return crossTypeCompare(od);
+
+	DataFloat f = (DataFloat)od;
+
+	if (mVal < f.mVal) return -1;
+	else if (mVal > f.mVal) return 1;
+	else return 0;
+}
+
+public void write(DataOutput out) throws IOException 
+{
+	out.write(Datum.DataType.FLOAT.getMarker());
+	out.writeFloat(mVal);	
+}
+
+static DataFloat read(DataInput in) throws IOException
+{
+	// Assumes that the float indicator has already been read in order to
+	// select his function.
+	return new DataFloat(in.readFloat());
+}
+ 
+private float mVal;
+
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataInteger.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates an integer datum.
+ */
+
+public class DataInteger extends AtomicDatum {
+
+public DataInteger() { mVal = 0; }
+
+public DataInteger(int val) { mVal = val; }
+
+public DataType getType() { return Datum.DataType.INT; }
+
+public long size() { return 4L; }
+
+/**
+ * Get the value of the datum, as an integer.
+ * @return integer value
+ */
+public final int get() { return mVal; }
+
+/**
+ * Set the value of the datum.
+ * @param val Value to be set.
+ */
+public final void set(int val) { mVal = val; }
+
+public void write(DataOutput out) throws IOException 
+{
+	out.write(Datum.DataType.INT.getMarker());
+	out.writeInt(mVal);	
+}
+
+public String toString()
+{
+	return String.valueOf(mVal);
+}
+
+public int hashCode()
+{
+	return mVal;
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataInteger)) return false;
+	DataInteger o = (DataInteger)other;
+	return mVal == o.mVal;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.INT) return crossTypeCompare(od);
+
+	DataInteger di = (DataInteger)od;
+
+	if (mVal < di.mVal) return -1;
+	else if (mVal > di.mVal) return 1;
+	else return 0;
+}
+
+static DataInteger read(DataInput in) throws IOException
+{
+	// Assumes that the integer indicator has already been read in order to
+	// select his function.
+	return new DataInteger(in.readInt());
+}
+ 
+private int mVal;
+
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java?rev=591143&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataLong.java Thu Nov  1 13:48:16 2007
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+/**
+ * Indicates an long datum.
+ */
+
+public class DataLong extends AtomicDatum {
+
+public DataLong() { mVal = 0; }
+
+public DataLong(long val) { mVal = val; }
+
+public DataType getType() { return Datum.DataType.LONG; }
+
+public long size() { return 4L; }
+
+/**
+ * Get the value of the datum, as a long.
+ * @return long value
+ */
+public final long get() { return mVal; }
+
+/**
+ * Set the value of the datum.
+ * @param val Value to be set.
+ */
+public final void set(long val) { mVal = val; }
+
+public String toString()
+{
+	return String.valueOf(mVal);
+}
+
+public int hashCode()
+{
+	return (int)mVal;
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataLong)) return false;
+	DataLong o = (DataLong)other;
+	return mVal == o.mVal;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.LONG) return crossTypeCompare(od);
+
+	DataLong dl = (DataLong)od;
+
+	if (mVal < dl.mVal) return -1;
+	else if (mVal > dl.mVal) return 1;
+	else return 0;
+}
+
+public void write(DataOutput out) throws IOException 
+{
+	out.write(Datum.DataType.LONG.getMarker());
+	out.writeLong(mVal);	
+}
+
+static DataLong read(DataInput in) throws IOException
+{
+	// Assumes that the long indicator has already been read in order to
+	// select his function.
+	return new DataLong(in.readLong());
+}
+ 
+private long mVal;
+
+}
+

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java?rev=591143&r1=591142&r2=591143&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataMap.java Thu Nov  1 13:48:16 2007
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *	 http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,117 +15,211 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.data;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class DataMap extends Datum {
-
-	Map<String, Datum> content = new HashMap<String, Datum>();
-	
-	@Override
-	public boolean equals(Object other) {
-		return compareTo(other) == 0;
-	}
-
-	public int compareTo(Object other) {
-		if (!(other instanceof DataMap))
-			return -1;
-		DataMap mbOther = (DataMap) other;
-		if (mbOther.cardinality()!=cardinality())
-			return cardinality() - mbOther.cardinality();
-		for (String key: content.keySet()){
-			if (!content.get(key).equals(mbOther.get(key)))
-				return -1;
-		}
-		return 0;
-	}
-	
-	/**
-	 * 
-	 * @return the cardinality of the data map
-	 */
-	public int cardinality(){
-		return content.size();
-	}
-	
-	/**
-	 * Adds the key value pair to the map
-	 * @param key
-	 * @param value
-	 */
-	public void put(String key, Datum value){
-		content.put(key, value);
-	}
-	
-	/**
-	 * Adds the value as a data atom mapped to the given key
-	 * @param key
-	 * @param value
-	 */
-	public void put(String key, String value){
-		content.put(key, new DataAtom(value));
-	}
-
-	/**
-	 * Adds the value as a data atom mapped to the given key
-	 * @param key
-	 * @param value
-	 */
-	
-	public void put(String key, int value){
-		content.put(key, new DataAtom(value));
-	}
-
-
-	/**
-	 * Fetch the value corresponding to a given key
-	 * @param key
-	 * @return
-	 */
-	public Datum get(String key){
-		Datum d = content.get(key);
-		if (d == null)
-			return new DataAtom("");
-		else
-			return d;
-	}
-	
-	@Override
-	public String toString(){
-		return content.toString();
-	}
-	
-	public static DataMap read(DataInput in) throws IOException{
-		int size = Tuple.decodeInt(in);
-		DataMap ret = new DataMap();
-        byte[] b = new byte[1];
-               
-        for (int i = 0; i < size; i++) {
-            in.readFully(b);
-            if (b[0]!=ATOM)
-            	throw new IOException("Invalid data when reading map from binary file");
-            String key = DataAtom.read(in).strval();
-            Datum value = Tuple.readDatum(in);
-            ret.put(key, value);
-        }
-        return ret;
-	}
-	
-	@Override
-	public void write(DataOutput out) throws IOException {
-		out.write(MAP);
-        Tuple.encodeInt(out, cardinality());
-        for (Entry<String, Datum> e: content.entrySet()){
-        	DataAtom d = new DataAtom(e.getKey());
-        	d.write(out);
-        	e.getValue().write(out);
-        }
- 	}
-
-}
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Iterator;
+
+/**
+ * A datum that contain a map.
+ */
+
+public class DataMap extends ComplexDatum {
+
+public DataMap()
+{
+	mContent = new HashMap<AtomicDatum, Datum>();
+}
+	
+public DataType getType() { return Datum.DataType.MAP; }
+
+/**
+ * @deprecated Use size() 
+ * @return the cardinality of the data map
+ */
+public int cardinality() { return (int)size(); }
+
+public long size() { return mContent.size(); }
+	
+/**
+ * Adds the key value pair to the map.  Does not make a copy of the key or
+ * value, simply stores a reference to them.
+ * @param key
+ * @param value
+ */
+public final void put(AtomicDatum key, Datum value) { mContent.put(key, value); }
+
+/**
+ * @deprecated
+ * Add a string key with a Datum value to a map.  This is included for backward
+ * compatibility only.
+ * @param key
+ * @param value
+ */
+public final void put(String key, Datum value)
+{
+	put(new DataAtom(key), value);
+}
+	/**
+ * @deprecated
+ * Add a string key/value pair to a map.  This is included for backward
+ * compatibility only.
+ * @param key
+ * @param value
+ */
+public final void put(String key, String value)
+{
+	put(new DataAtom(key), new DataAtom(value));
+}
+	
+/**
+ * Fetch the value corresponding to a given key
+ * @param key
+ * @return Value, as a datum
+ */
+public final Datum get(AtomicDatum key)
+{
+	Datum d = mContent.get(key);
+	if (d == null) {
+		d = new DataUnknown();
+		d.setNull(true);
+		return d;
+	} else {
+		return d;
+	}
+}
+	
+@Override
+public String toString()
+{
+	StringBuffer sb = new StringBuffer();
+	sb.append('[');
+	Iterator<Map.Entry<AtomicDatum, Datum> > i;
+	for (i = mContent.entrySet().iterator(); i.hasNext(); ) {
+		Map.Entry<AtomicDatum, Datum> e = i.next();
+		sb.append(e.getKey().toString());
+		sb.append('#');
+		sb.append(e.getValue().toString());
+		if (i.hasNext()) sb.append(", ");
+	}
+	sb.append(']');
+	return sb.toString();
+}
+
+public int hashCode()
+{
+	int hash = 0;
+	Iterator<Map.Entry<AtomicDatum, Datum> > i;
+	for (i = mContent.entrySet().iterator(); i.hasNext(); ) {
+		Map.Entry<AtomicDatum, Datum> e = i.next();
+		hash += (e.getKey().hashCode() ^ e.getValue().hashCode());
+	}
+	return hash;
+}
+
+// Don't make this use compareTo.  These functions are used in things like hashs
+// and we want them to be as fast as possible.
+/*
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataMap)) return false;
+	DataMap o = (DataMap)other;
+	if (mContent.size() != o.mContent.size()) return false;
+
+	Iterator<Map.Entry<AtomicDatum, Datum> > i;
+	Iterator<Map.Entry<AtomicDatum, Datum> > j;
+	for (i = mContent.entrySet().iterator(), j = o.mContent.entrySet().iterator();
+			i.hasNext(); ) {
+		Map.Entry<AtomicDatum, Datum> us = i.next();
+		Map.Entry<AtomicDatum, Datum> them = j.next();
+		if (!us.getKey().equals(them.getKey()) ||
+				!us.getValue().equals(them.getValue())) return false;
+	}
+
+	return true;
+}
+*/
+
+public boolean equals(Object other)
+{
+	if (!(other instanceof DataMap)) return false;
+	DataMap o = (DataMap)other;
+	if (mContent.size() != o.mContent.size()) return false;
+
+	Iterator<Map.Entry<AtomicDatum, Datum> > i;
+	for (i = mContent.entrySet().iterator(); i.hasNext(); ) {
+		Map.Entry<AtomicDatum, Datum> us = i.next();
+		Datum val = (Datum)o.get(us.getKey());
+		if (val == null) return false;
+		if (!val.equals(us.getValue())) return false;
+	}
+
+	return true;
+}
+
+public int compareTo(Object other)
+{
+	if (!(other instanceof Datum)) return -1;
+
+	Datum od = (Datum)other;
+
+	if (od.getType() != Datum.DataType.MAP) return crossTypeCompare(od);
+
+	DataMap map = (DataMap)od;
+
+	if (mContent.size() < map.mContent.size()) return -1;
+	else if (mContent.size() > map.mContent.size()) return 1;
+
+	Iterator<Map.Entry<AtomicDatum, Datum> > i;
+	Iterator<Map.Entry<AtomicDatum, Datum> > j;
+	for (i = mContent.entrySet().iterator(), j = map.mContent.entrySet().iterator();
+			i.hasNext(); ) {
+		Map.Entry<AtomicDatum, Datum> us = i.next();
+		Map.Entry<AtomicDatum, Datum> them = j.next();
+		int keyrc = us.getKey().compareTo(them.getKey());
+		if (keyrc != 0) {
+			return keyrc;
+		} else {
+			int valrc = us.getValue().compareTo(them.getValue());
+			if (valrc != 0) return valrc;
+		}
+	}
+
+	return 0;
+}
+
+
+public static DataMap read(DataInput in) throws IOException
+{
+	long size = in.readLong();
+	DataMap ret = new DataMap();
+	for (long i = 0; i < size; i++) {
+		Datum key = DatumImpl.readDatum(in);
+		if (key.getDimension() == Datum.DataDimension.COMPLEX) {
+			throw new IOException("Maps only accept atomic and unknown data types as keys");
+		}
+		ret.put((AtomicDatum)key, DatumImpl.readDatum(in));
+	}
+	return ret;
+}
+	
+@Override
+public void write(DataOutput out) throws IOException
+{
+	out.write(Datum.DataType.MAP.getMarker());
+	out.writeLong(size());
+	for (Entry<AtomicDatum, Datum> e: mContent.entrySet()){
+		AtomicDatum k = e.getKey();
+		k.write(out);
+		e.getValue().write(out);
+ 	}
+}
+
+private Map<AtomicDatum, Datum> mContent;
+
+}



Mime
View raw message