Return-Path: Delivered-To: apmail-incubator-pig-commits-archive@locus.apache.org Received: (qmail 55130 invoked from network); 22 Jan 2008 21:18:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 22 Jan 2008 21:18:21 -0000 Received: (qmail 50983 invoked by uid 500); 22 Jan 2008 21:18:12 -0000 Delivered-To: apmail-incubator-pig-commits-archive@incubator.apache.org Received: (qmail 50954 invoked by uid 500); 22 Jan 2008 21:18:12 -0000 Mailing-List: contact pig-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@incubator.apache.org Delivered-To: mailing list pig-commits@incubator.apache.org Received: (qmail 50937 invoked by uid 99); 22 Jan 2008 21:18:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jan 2008 13:18:11 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jan 2008 21:17:39 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E466C1A9850; Tue, 22 Jan 2008 13:17:45 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r614325 [5/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac... Date: Tue, 22 Jan 2008 21:17:22 -0000 To: pig-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080122211745.E466C1A9850@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POCogroup.java Tue Jan 22 13:17:12 2008 @@ -26,8 +26,9 @@ import org.apache.pig.data.AmendableTuple; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; -import org.apache.pig.data.Datum; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.eval.collector.DataCollector; import org.apache.pig.impl.logicalLayer.LOCogroup; @@ -40,7 +41,7 @@ * */ private static final long serialVersionUID = 1L; - List[] sortedInputs; + List[] sortedInputs; List specs; public POCogroup(List specs, int outputType) { @@ -61,11 +62,11 @@ for (int i = 0; i < inputs.length; i++) { final int finalI = i; - sortedInputs[i] = new ArrayList(); + sortedInputs[i] = new ArrayList(); DataCollector outputFromSpec = new DataCollector(null){ @Override - public void add(Datum d) { + public void add(Object d) { sortedInputs[finalI].add(LOCogroup.getGroupAndTuple(d)); } }; @@ -78,9 +79,9 @@ } inputToSpec.finishPipe(); - Collections.sort(sortedInputs[i], new Comparator() { - public int compare(Datum[] a, Datum[] b) { - return a[0].compareTo(b[0]); + Collections.sort(sortedInputs[i], new Comparator() { + public int compare(Object[] a, Object[] b) { + return DataType.compare(a[0], b[0]); } }); } @@ -95,11 +96,11 @@ // find the smallest group among all inputs (this is the group we should make a tuple // out of) - Datum smallestGroup = null; + Object smallestGroup = null; for (int i = 0; i < inputs.length; i++) { if (sortedInputs[i].size() > 0) { - Datum g = (sortedInputs[i].get(0))[0]; - if (smallestGroup == null || g.compareTo(smallestGroup)<0) + Object g = (sortedInputs[i].get(0))[0]; + if (smallestGroup == null || DataType.compare(g, smallestGroup)<0) smallestGroup = g; } } @@ -112,26 +113,26 @@ Tuple output; if (outputType == LogicalOperator.AMENDABLE) output = new AmendableTuple(1 + inputs.length, smallestGroup); - else output = new Tuple(1 + inputs.length); + else output = TupleFactory.getInstance().newTuple(1 + inputs.length); // set first field to the group tuple - output.setField(0, smallestGroup); + output.set(0, smallestGroup); if (lineageTracer != null) lineageTracer.insert(output); boolean done = true; for (int i = 0; i < inputs.length; i++) { - DataBag b = - BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE); + DataBag b = BagFactory.getInstance().newDefaultBag(); while (sortedInputs[i].size() > 0) { - Datum g = sortedInputs[i].get(0)[0]; + Object g = sortedInputs[i].get(0)[0]; Tuple t = (Tuple) sortedInputs[i].get(0)[1]; - if (g.compareTo(smallestGroup) < 0) { + int c = DataType.compare(g, smallestGroup); + if (c < 0) { sortedInputs[i].remove(0); // discard this tuple - } else if (g.equals(smallestGroup)) { + } else if (c == 0) { b.add(t); if (lineageTracer != null) lineageTracer.union(t, output); // update lineage sortedInputs[i].remove(0); @@ -140,17 +141,21 @@ } } - if (specs.get(i).isInner() && b.isEmpty()) + if (specs.get(i).isInner() && (b.size() == 0)) done = false; // this input uses "inner" semantics, and it has no tuples for // this group, so suppress the tuple we're currently building - output.setField(1 + i, b); + output.set(1 + i, b); } if (done) return output; } + } + + public void visit(POVisitor v) { + v.visitCogroup(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POEval.java Tue Jan 22 13:17:12 2008 @@ -97,4 +97,8 @@ } } + public void visit(POVisitor v) { + v.visitEval(this); + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POLoad.java Tue Jan 22 13:17:12 2008 @@ -65,4 +65,9 @@ return lf.getNext(); } + @Override + public void visit(POVisitor v) { + v.visitLoad(this); + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Tue Jan 22 13:17:12 2008 @@ -19,7 +19,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; +import org.apache.log4j.Logger; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; @@ -27,25 +32,28 @@ import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.mapreduceExec.MapReduceLauncher; import org.apache.pig.impl.util.ObjectSerializer; - +import org.apache.pig.impl.util.PigLogger; public class POMapreduce extends PhysicalOperator { private static final long serialVersionUID = 1L; public ArrayList toMap = new ArrayList(); - public ArrayList toCombine = null; + //public ArrayList toCombine = null; + public EvalSpec toCombine = null; public EvalSpec toReduce = null; public ArrayList groupFuncs = null; public SplitSpec toSplit = null; public ArrayList inputFileSpecs = new ArrayList(); public FileSpec outputFileSpec = null; public Class partitionFunction = null; + public Class userComparator = null; public String quantilesFile = null; public PigContext pigContext; public int mapParallelism = -1; // -1 means let hadoop decide public int reduceParallelism = -1; + static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher(); @@ -156,16 +164,16 @@ } void print() { - System.out.println("\n----- MapReduce Job -----"); - System.out.println("Input: " + inputFileSpecs); - System.out.println("Map: " + toMap); - System.out.println("Group: " + groupFuncs); - System.out.println("Combine: " + toCombine); - System.out.println("Reduce: " + toReduce); - System.out.println("Output: " + outputFileSpec); - System.out.println("Split: " + toSplit); - System.out.println("Map parallelism: " + mapParallelism); - System.out.println("Reduce parallelism: " + reduceParallelism); + Logger log = PigLogger.getLogger(); + log.debug("Input: " + inputFileSpecs); + log.debug("Map: " + toMap); + log.debug("Group: " + groupFuncs); + log.debug("Combine: " + toCombine); + log.debug("Reduce: " + toReduce); + log.debug("Output: " + outputFileSpec); + log.debug("Split: " + toSplit); + log.debug("Map parallelism: " + mapParallelism); + log.debug("Reduce parallelism: " + reduceParallelism); } public POMapreduce copy(){ @@ -199,6 +207,10 @@ toReduce = spec; else toReduce = toReduce.addSpec(spec); + } + + public void visit(POVisitor v) { + v.visitMapreduce(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PORead.java Tue Jan 22 13:17:12 2008 @@ -22,7 +22,6 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; -import org.apache.pig.data.Datum; public class PORead extends PhysicalOperator { @@ -31,7 +30,7 @@ */ private static final long serialVersionUID = 1L; DataBag bag; - Iterator it; + Iterator it; public PORead(DataBag bagIn, int outputType) { super(outputType); @@ -47,7 +46,7 @@ if (continueFromLast){ throw new RuntimeException("LOReads should not occur in continuous plans"); } - it = bag.content(); + it = bag.iterator(); return true; } @@ -55,9 +54,13 @@ @Override public Tuple getNext() throws IOException { if (it.hasNext()) - return (Tuple)it.next(); + return it.next(); else return null; + } + + public void visit(POVisitor v) { + v.visitRead(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSort.java Tue Jan 22 13:17:12 2008 @@ -24,14 +24,13 @@ import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; -import org.apache.pig.data.Datum; import org.apache.pig.impl.eval.EvalSpec; public class POSort extends PhysicalOperator { static final long serialVersionUID = 1L; EvalSpec sortSpec; - transient Iterator iter; + transient Iterator iter; public POSort(EvalSpec sortSpec, int outputType) { @@ -44,24 +43,27 @@ public boolean open(boolean continueFromLast) throws IOException { if (!super.open(continueFromLast)) return false; - DataBag bag = - BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE); + DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec); - bag.sort(sortSpec); Tuple t; while((t = inputs[0].getNext())!=null){ bag.add(t); } - iter = bag.content(); + iter = bag.iterator(); return true; } @Override public Tuple getNext() throws IOException { if (iter.hasNext()) - return (Tuple)iter.next(); + return iter.next(); else return null; } + + @Override + public void visit(POVisitor v) { + v.visitSort(this); + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitMaster.java Tue Jan 22 13:17:12 2008 @@ -109,4 +109,9 @@ } } */ + + public void visit(POVisitor v) { + v.visitSplitMaster(this); + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POSplitSlave.java Tue Jan 22 13:17:12 2008 @@ -42,4 +42,8 @@ return master.slaveGetNext(this); } + public void visit(POVisitor v) { + v.visitSplit(this); + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POStore.java Tue Jan 22 13:17:12 2008 @@ -21,8 +21,8 @@ import org.apache.pig.StoreFunc; import org.apache.pig.data.DataBag; +import org.apache.pig.data.BagFactory; import org.apache.pig.data.Tuple; -import org.apache.pig.data.Datum; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.io.PigFile; @@ -63,7 +63,7 @@ @Override public Tuple getNext() throws IOException { // get all tuples from input, and store them. - DataBag b = new DataBag(Datum.DataType.TUPLE); + DataBag b = BagFactory.getInstance().newDefaultBag(); Tuple t; while ((t = (Tuple) inputs[0].getNext()) != null) { b.add(t); @@ -88,6 +88,11 @@ new RuntimeException().printStackTrace(); System.exit(1); return -1; + } + + @Override + public void visit(POVisitor v) { + v.visitStore(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/POUnion.java Tue Jan 22 13:17:12 2008 @@ -72,4 +72,8 @@ return null; } + public void visit(POVisitor v) { + v.visitUnion(this); + } + } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Tue Jan 22 13:17:12 2008 @@ -71,4 +71,6 @@ public int getOutputType(){ return outputType; } + + public abstract void visit(POVisitor v); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalPlan.java Tue Jan 22 13:17:12 2008 @@ -21,7 +21,7 @@ import java.util.Map; import org.apache.pig.data.DataBag; -import org.apache.pig.data.Datum; +import org.apache.pig.data.BagFactory; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -34,7 +34,7 @@ } public DataBag exec(boolean continueFromLast) throws IOException { - DataBag results = new DataBag(Datum.DataType.TUPLE); + DataBag results = BagFactory.getInstance().newDefaultBag(); root.open(continueFromLast); Tuple t; Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/DataBuffer.java Tue Jan 22 13:17:12 2008 @@ -19,7 +19,6 @@ import java.util.*; -import org.apache.pig.data.Datum; import org.apache.pig.impl.eval.collector.DataCollector; @@ -30,14 +29,14 @@ super(null); } - List buf = Collections.synchronizedList(new LinkedList()); + List buf = Collections.synchronizedList(new LinkedList()); @Override - public void add(Datum d){ + public void add(Object d){ if (d != null) buf.add(d); } - public Datum removeFirst(){ + public Object removeFirst(){ if (buf.isEmpty()) return null; else @@ -48,8 +47,8 @@ * This is a sequence we want to do frequently to accomodate the simple eval case, i.e., cases * where we know that running an eval spec one item should produce one and only one item. */ - public Datum removeFirstAndAssertEmpty(){ - Datum d; + public Object removeFirstAndAssertEmpty(){ + Object d; if (isStale() || (d = removeFirst()) == null){ throw new RuntimeException("Simple eval used but buffer found to be empty or stale"); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/JarManager.java Tue Jan 22 13:17:12 2008 @@ -168,17 +168,29 @@ */ private static void mergeJar(JarOutputStream jarFile, String jar, String prefix, Map contents) throws FileNotFoundException, IOException { - JarInputStream jis = new JarInputStream(new FileInputStream(jar)); + JarInputStream jarInput = new JarInputStream(new FileInputStream(jar)); + + mergeJar(jarFile, jarInput, prefix, contents); + } + + private static void mergeJar(JarOutputStream jarFile, URL jar, String prefix, Map contents) + throws FileNotFoundException, IOException { + JarInputStream jarInput = new JarInputStream(jar.openStream()); + + mergeJar(jarFile, jarInput, prefix, contents); + } + + private static void mergeJar(JarOutputStream jarFile, JarInputStream jarInput, String prefix, Map contents) + throws FileNotFoundException, IOException { JarEntry entry; - while ((entry = jis.getNextJarEntry()) != null) { + while ((entry = jarInput.getNextJarEntry()) != null) { if (prefix != null && !entry.getName().startsWith(prefix)) { continue; } - addStream(jarFile, entry.getName(), jis, contents); + addStream(jarFile, entry.getName(), jarInput, contents); } } - - /** + /** * Adds a stream to a Jar file. * * @param os Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/util/PigLogger.java Tue Jan 22 13:17:12 2008 @@ -36,7 +36,6 @@ { if (mLogger == null) { mLogger = Logger.getLogger("org.apache.pig"); - mLogger.setAdditivity(false); } return mLogger; } Propchange: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Tue Jan 22 13:17:12 2008 @@ -0,0 +1,7 @@ + +TokenMgrError.java +Token.java +SimpleCharStream.java +ParseException.java +GruntParserTokenManager.java +GruntParserConstants.java Propchange: incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Tue Jan 22 13:17:12 2008 @@ -0,0 +1,8 @@ + +TokenMgrError.java +Token.java +SimpleCharStream.java +PigScriptParserTokenManager.java +PigScriptParserConstants.java +PigScriptParser.java +ParseException.java Propchange: incubator/pig/branches/types/test/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Tue Jan 22 13:17:12 2008 @@ -0,0 +1,2 @@ + +reports Added: incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=614325&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java Tue Jan 22 13:17:12 2008 @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import org.apache.pig.data.*; +import org.apache.pig.impl.eval.EvalSpec; + +// Test data bag factory, for testing that we can propery provide a non +// default bag factory. +public class NonDefaultBagFactory extends BagFactory { + public DataBag newDefaultBag() { return null; } + public DataBag newSortedBag(EvalSpec sortSpec) { return null; } + public DataBag newDistinctBag() { return null; } + + public NonDefaultBagFactory() { super(); } +} + Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Tue Jan 22 13:17:12 2008 @@ -33,6 +33,36 @@ public class TestAlgebraicEval extends TestCase { private String initString = "mapreduce"; + + @Test + public void testGroupCountWithMultipleFields() throws Exception { + int LOOP_COUNT = 1024; + PigServer pig = new PigServer(initString); + File tmpFile = File.createTempFile("test", "txt"); + PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + for(int i = 0; i < LOOP_COUNT; i++) { + for(int j=0; j< LOOP_COUNT; j++) { + ps.println(i + "\t" + i + "\t" + j%2); + } + } + ps.close(); + String query = "myid = foreach (group (load 'file:" + tmpFile + "') all) generate group, COUNT($1) ;"; + System.out.println(query); + pig.registerQuery(" a = group (load 'file:" + tmpFile + "') by ($0,$1);"); + pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);"); + Iterator it = pig.openIterator("b"); + tmpFile.delete(); + int count = 0; + while(it.hasNext()){ + int sum = it.next().getAtomField(2).numval().intValue(); + assertEquals(LOOP_COUNT/2, sum); + count++; + } + assertEquals(count, LOOP_COUNT); + } + + + @Test public void testSimpleCount() throws Exception { int LOOP_COUNT = 1024; @@ -72,6 +102,8 @@ Double count = t.getAtomField(1).numval(); assertEquals(count, (double)LOOP_COUNT); } + + @Test public void testGroupReorderCount() throws Exception { Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestBuiltin.java Tue Jan 22 13:17:12 2008 @@ -62,6 +62,48 @@ } @Test + public void testAVGInitial() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc avg = new AVG.Initial(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + Tuple output = new Tuple(); + avg.exec(tup, output); + + assertEquals("Expected sum to be 55.0", 55.0, + output.getAtomField(0).numval()); + assertEquals("Expected count to be 10", 10, + output.getAtomField(1).longVal()); + } + + @Test + public void testAVGFinal() throws Exception { + Tuple t1 = new Tuple(2); + t1.setField(0, 55.0); + t1.setField(1, 10); + Tuple t2 = new Tuple(2); + t2.setField(0, 28.0); + t2.setField(1, 7); + Tuple t3 = new Tuple(2); + t3.setField(0, 82.0); + t3.setField(1, 17); + DataBag bag = BagFactory.getInstance().newDefaultBag(); + bag.add(t1); + bag.add(t2); + bag.add(t3); + + Tuple tup = new Tuple(bag); + + EvalFunc avg = new AVG.Final(); + DataAtom output = new DataAtom(); + avg.exec(tup, output); + + assertEquals("Expected average to be 4.852941176470588", + 4.852941176470588, output.numval()); + } + + + @Test public void testCOUNT() throws Exception { int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; double expected = input.length; @@ -91,7 +133,7 @@ count.exec(tup,output); assertTrue(output.numval() == 0); - map.put("a", "a"); + map.put("a", new DataAtom("a")); assertFalse(isEmpty.exec(tup)); count.exec(tup,output); @@ -105,7 +147,32 @@ assertTrue(output.numval() == 2); } - + + @Test + public void testCOUNTInitial() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc count = new COUNT.Initial(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + Tuple output = new Tuple(); + count.exec(tup, output); + + assertEquals("Expected count to be 10", 10, + output.getAtomField(0).longVal()); + } + + @Test + public void testCOUNTFinal() throws Exception { + int input[] = { 23, 38, 39 }; + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + + EvalFunc count = new COUNT.Final(); + DataAtom output = new DataAtom(); + count.exec(tup, output); + + assertEquals("Expected count to be 100", 100, output.longVal()); + } + @Test public void testSUM() throws Exception { int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; @@ -121,6 +188,108 @@ assertTrue(actual == expected); } + @Test + public void testSUMInitial() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc sum = new SUM.Initial(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + Tuple output = new Tuple(); + sum.exec(tup, output); + + assertEquals("Expected sum to be 55.0", 55.0, + output.getAtomField(0).numval()); + } + + @Test + public void testSUMFinal() throws Exception { + int input[] = { 23, 38, 39 }; + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + + EvalFunc sum = new SUM.Final(); + DataAtom output = new DataAtom(); + sum.exec(tup, output); + + assertEquals("Expected sum to be 100.0", 100.0, output.numval()); + } + + @Test + public void testMIN() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc min = new MIN(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + DataAtom output = new DataAtom(); + min.exec(tup, output); + + assertEquals("Expected min to be 1.0", 1.0, output.numval()); + } + + + @Test + public void testMINInitial() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc min = new MIN.Initial(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + Tuple output = new Tuple(); + min.exec(tup, output); + + assertEquals("Expected min to be 1.0", 1.0, + output.getAtomField(0).numval()); + } + + @Test + public void testMINFinal() throws Exception { + int input[] = { 23, 38, 39 }; + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + + EvalFunc min = new MIN.Final(); + DataAtom output = new DataAtom(); + min.exec(tup, output); + + assertEquals("Expected sum to be 23.0", 23.0, output.numval()); + } + + @Test + public void testMAX() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc max = new MAX(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + DataAtom output = new DataAtom(); + max.exec(tup, output); + + assertEquals("Expected max to be 10.0", 10.0, output.numval()); + } + + + @Test + public void testMAXInitial() throws Exception { + int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + + EvalFunc max = new MAX.Initial(); + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + Tuple output = new Tuple(); + max.exec(tup, output); + + assertEquals("Expected max to be 10.0", 10.0, + output.getAtomField(0).numval()); + } + + @Test + public void testMAXFinal() throws Exception { + int input[] = { 23, 38, 39 }; + Tuple tup = Util.loadNestTuple(new Tuple(1), input); + + EvalFunc max = new MAX.Final(); + DataAtom output = new DataAtom(); + max.exec(tup, output); + + assertEquals("Expected sum to be 39.0", 39.0, output.numval()); + } + + // Builtin APPLY Functions // ======================== @@ -159,6 +328,7 @@ assertTrue(f3.arity() == arity3); } + /* @Test public void testLFBin() throws Exception { @@ -172,8 +342,7 @@ t2.setField(0,a); Tuple t3 = new Tuple(1); t3.setField(0, b); - DataBag bag = - BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE); + DataBag bag = BagFactory.getInstance().getNewBigBag(); bag.add(t2); bag.add(t3); Tuple t4 = new Tuple(2); @@ -192,8 +361,7 @@ t6.setField(0,c); Tuple t7 = new Tuple(1); t7.setField(0, d); - DataBag bag2 = - BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE); + DataBag bag2 = BagFactory.getInstance().getNewBigBag(); for(int i = 0; i < 10; i ++) { bag2.add(t6); bag2.add(t7); @@ -224,6 +392,7 @@ assertTrue(r1.equals(t1)); assertTrue(r2.equals(t5)); } + */ @Test @@ -316,12 +485,8 @@ for (int i=0; i< numTimes; i++){ Tuple t = iter.next(); - - Tuple t0 = (Tuple)t.getBagField(0).content().next(); - Tuple t1 = (Tuple)t.getBagField(1).content().next(); - assertEquals(i+"AA", t0.getAtomField(0).strval()); - assertEquals(i+"BB", t1.getAtomField(0).strval()); - + assertEquals(i+"AA", t.getBagField(0).iterator().next().getAtomField(0).strval()); + assertEquals(i+"BB", t.getBagField(1).iterator().next().getAtomField(0).strval()); } assertFalse(iter.hasNext()); Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Tue Jan 22 13:17:12 2008 @@ -17,698 +17,728 @@ */ package org.apache.pig.test; -import java.io.DataInput; -import java.io.DataOutput; +/* import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileInputStream; import java.io.IOException; - -import java.util.List; -import java.util.ArrayList; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.util.Iterator; +import java.util.Random; +*/ + +import java.util.*; +import java.io.IOException; import org.junit.Test; import org.apache.pig.data.*; +import org.apache.pig.impl.eval.*; +import org.apache.pig.impl.util.Spillable; /** - * This class will exercise the data bag data type. + * This class will exercise the basic Pig data model and members. It tests for proper behavior in + * assigment and comparision, as well as function application. * - * @author gates + * @author dnm */ -public class TestDataBag extends junit.framework.TestCase -{ - -public void testDefaultConstructor() throws Exception -{ - DataBag bag = new DataBag(Datum.DataType.INT); - - assertEquals("getType", Datum.DataType.BAG, bag.getType()); - assertFalse("is null", bag.isNull()); - assertTrue("bag of ints", bag.bagOf() == Datum.DataType.INT); - - assertEquals("Default constructor size before", 0, bag.size()); - DataInteger val = new DataInteger(42); - - bag.add(val); - assertEquals("Default constructor size after", 1, bag.size()); - - Iterator i = bag.content(); - Datum d = i.next(); - - assertTrue("should be an integer", d.getType() == Datum.DataType.INT); - assertNotNull("get with entry in bag", d); - assertEquals("value of val", 42, ((DataInteger)d).get()); -} - -public void testListConstructor() throws Exception -{ - List list = new ArrayList(); - list.add(new DataInteger(10)); - list.add(new DataInteger(11)); - list.add(new DataInteger(9)); - - DataBag bag = new DataBag(list); - - assertEquals("list construct size", 3L, bag.size()); - - Iterator i = bag.content(); - Datum d = i.next(); - assertNotNull("get first entry in bag", d); - assertTrue("should be an integer", d.getType() == Datum.DataType.INT); - assertEquals("first value of val", 10, ((DataInteger)d).get()); - d = i.next(); - assertNotNull("get second entry in bag", d); - assertTrue("should be an integer", d.getType() == Datum.DataType.INT); - assertEquals("second value of val", 11, ((DataInteger)d).get()); - d = i.next(); - assertNotNull("get third entry in bag", d); - assertTrue("should be an integer", d.getType() == Datum.DataType.INT); - assertEquals("third value of val", 9, ((DataInteger)d).get()); - assertFalse("bag should be exhausted now", i.hasNext()); - - bag.add(new DataInteger(4)); - i = bag.content(); - d = i.next(); - d = i.next(); - d = i.next(); - d = i.next(); - assertNotNull("get fourth entry in bag", d); - assertTrue("should be an integer", d.getType() == Datum.DataType.INT); - assertEquals("fourth value of val", 4, ((DataInteger)d).get()); - assertFalse("bag should be exhausted now", i.hasNext()); -} - - -public void testBigBag() throws Exception -{ - DataBag bag = new DataBag(Datum.DataType.INT); - - for (int i = 0; i < 10000; i++) { - bag.add(new DataInteger(i)); - } - - assertEquals("big size after loading", 10000, bag.size()); - - Iterator i = bag.content(); - for (int j = 0; j < 10000; j++) { - assertTrue("should still have data", i.hasNext()); - Datum val = i.next(); - assertTrue("should be an integer", val.getType() == Datum.DataType.INT); - assertEquals("value of val", j, ((DataInteger)val).get()); - } - assertFalse("bag should be exhausted now", i.hasNext()); -} - -public void testToString() throws Exception -{ - DataBag bag = new DataBag(Datum.DataType.INT); - - bag.add(new DataInteger(1)); - bag.add(new DataInteger(1)); - bag.add(new DataInteger(3)); - - assertEquals("toString", "{1, 1, 3}", bag.toString()); -} - -public void testEquals() throws Exception -{ - DataBag bag1 = new DataBag(Datum.DataType.INT); - DataBag bag2 = new DataBag(Datum.DataType.INT); - - bag1.add(new DataInteger(3)); - bag2.add(new DataInteger(3)); - - assertFalse("different object", bag1.equals(new String())); - - assertTrue("same data", bag1.equals(bag2)); - - bag2 = new DataBag(Datum.DataType.INT); - bag2.add(new DataInteger(4)); - assertFalse("different data", bag1.equals(bag2)); - - bag2 = new DataBag(Datum.DataType.INT); - bag2.add(new DataInteger(3)); - bag2.add(new DataInteger(3)); - assertFalse("different size", bag1.equals(bag2)); - - bag2 = new DataBag(Datum.DataType.LONG); - bag2.add(new DataLong(3)); - assertFalse("different type of bag", bag1.equals(bag2)); -} - -public void testCompareTo() throws Exception -{ - DataBag bag1 = new DataBag(Datum.DataType.INT); - DataBag bag2 = new DataBag(Datum.DataType.INT); - - bag1.add(new DataInteger(3)); - bag2.add(new DataInteger(3)); - - assertEquals("different object less than", -1, bag1.compareTo(new String())); - - Tuple t = new Tuple(); - assertTrue("less than tuple", bag1.compareTo(t) < 0); - DataMap map = new DataMap(); - assertTrue("less than map", bag1.compareTo(map) < 0); - DataLong l = new DataLong(); - assertTrue("less than long", bag1.compareTo(l) < 0); - DataFloat f = new DataFloat(); - assertTrue("less than float", bag1.compareTo(f) < 0); - DataDouble d = new DataDouble(); - assertTrue("less than double", bag1.compareTo(d) < 0); - DataUnknown unk = new DataUnknown(); - assertTrue("less than unknown", bag1.compareTo(unk) < 0); - DataCharArrayUtf16 utf16 = new DataCharArrayUtf16(); - assertTrue("less than utf16", bag1.compareTo(utf16) < 0); - - assertEquals("same data equal", 0, bag1.compareTo(bag2)); - - bag2 = new DataBag(Datum.DataType.INT); - bag2.add(new DataInteger(2)); - assertEquals("greater than bag with lesser value", 1, bag1.compareTo(bag2)); - - bag2 = new DataBag(Datum.DataType.INT); - bag2.add(new DataInteger(4)); - assertEquals("less than bag with greater value", -1, bag1.compareTo(bag2)); - - bag2 = new DataBag(Datum.DataType.INT); - bag2.add(new DataInteger(3)); - bag2.add(new DataInteger(4)); - assertEquals("less than bigger bag", -1, bag1.compareTo(bag2)); - - bag2 = new DataBag(Datum.DataType.INT); - assertEquals("greater than smaller bag", 1, bag1.compareTo(bag2)); - - bag2 = new DataBag(Datum.DataType.LONG); - bag2.add(new DataLong(3)); - assertEquals("different type of bag", -1, bag1.compareTo(bag2)); -} - - -public void testWriteReadUnknown() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.UNKNOWN); - - String s = new String("zzz"); - before.add(new DataUnknown(s.getBytes())); - s = new String("yyy"); - before.add(new DataUnknown(s.getBytes())); - s = new String("xxx"); - before.add(new DataUnknown(s.getBytes())); - - File file = null; - file = File.createTempFile("DataBagUnknown", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of unknowns", after.bagOf() == Datum.DataType.UNKNOWN); - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum valAfter = j.next(); - assertTrue("should be an unknown", - valAfter.getType() == Datum.DataType.UNKNOWN); - for (int i = 0; i < 3; i++) { - assertEquals("value of valAfter", (byte)0x7a, - ((DataUnknown)valAfter).get()[i]); - } - - valAfter = j.next(); - assertTrue("should be an unknown", - valAfter.getType() == Datum.DataType.UNKNOWN); - for (int i = 0; i < 3; i++) { - assertEquals("value of valAfter", (byte)0x79, - ((DataUnknown)valAfter).get()[i]); - } - - valAfter = j.next(); - assertTrue("should be an unknown", - valAfter.getType() == Datum.DataType.UNKNOWN); - for (int i = 0; i < 3; i++) { - assertEquals("value of valAfter", (byte)0x78, - ((DataUnknown)valAfter).get()[i]); - } - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadInt() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.INT); - - before.add(new DataInteger(99)); - before.add(new DataInteger(-98)); - before.add(new DataInteger(97)); - - File file = null; - file = File.createTempFile("DataBagInteger", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of ints", after.bagOf() == Datum.DataType.INT); - - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum val = j.next(); - assertTrue("should be an integer", val.getType() == Datum.DataType.INT); - assertEquals("value of valAfter", 99, ((DataInteger)val).get()); - - val = j.next(); - assertTrue("should be an integer", val.getType() == Datum.DataType.INT); - assertEquals("value of valAfter2", -98, ((DataInteger)val).get()); - - val = j.next(); - assertTrue("should be an integer", val.getType() == Datum.DataType.INT); - assertEquals("value of valAfter", 97, ((DataInteger)val).get()); - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadLong() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.LONG); - - before.add(new DataLong(99000000000L)); - before.add(new DataLong(-98L)); - before.add(new DataLong(97L)); - - File file = null; - file = File.createTempFile("DataBagLong", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of longs", after.bagOf() == Datum.DataType.LONG); - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum val = j.next(); - assertTrue("should be a long", val.getType() == Datum.DataType.LONG); - assertEquals("value of valAfter", 99000000000L, ((DataLong)val).get()); - - val = j.next(); - assertTrue("should be a long", val.getType() == Datum.DataType.LONG); - assertEquals("value of valAfter2", -98L, ((DataLong)val).get()); - - val = j.next(); - assertTrue("should be a long", val.getType() == Datum.DataType.LONG); - assertEquals("value of valAfter", 97L, ((DataLong)val).get()); - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadFloat() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.FLOAT); - - before.add(new DataFloat(3.2e32f)); - before.add(new DataFloat(-9.929292e-29f)); - before.add(new DataFloat(97.0f)); - - File file = null; - file = File.createTempFile("DataBagFloat", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of floats", after.bagOf() == Datum.DataType.FLOAT); - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum val = j.next(); - assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT); - assertEquals("value of valAfter", 3.2e32f, ((DataFloat)val).get()); +public class TestDataBag extends junit.framework.TestCase { - val = j.next(); - assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT); - assertEquals("value of valAfter2", -9.929292e-29f, ((DataFloat)val).get()); + private Random rand = new Random(); - val = j.next(); - assertTrue("should be a float", val.getType() == Datum.DataType.FLOAT); - assertEquals("value of valAfter", 97.0f, ((DataFloat)val).get()); + private class TestMemoryManager { + ArrayList mManagedObjects = new ArrayList(); - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadDouble() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.DOUBLE); - - before.add(new DataDouble(3.2e132)); - before.add(new DataDouble(-9.929292e-129)); - before.add(new DataDouble(97.0)); - - File file = null; - file = File.createTempFile("DataBagDouble", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of double", after.bagOf() == Datum.DataType.DOUBLE); - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum val = j.next(); - assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE); - assertEquals("value of valAfter", 3.2e132, ((DataDouble)val).get()); - - val = j.next(); - assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE); - assertEquals("value of valAfter2", -9.929292e-129, ((DataDouble)val).get()); - - val = j.next(); - assertTrue("should be a double", val.getType() == Datum.DataType.DOUBLE); - assertEquals("value of valAfter", 97.0, ((DataDouble)val).get()); - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadUtf16() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.CHARARRAY); - - before.add(new DataCharArrayUtf16("zzz")); - before.add(new DataCharArrayUtf16("yyy")); - before.add(new DataCharArrayUtf16("xxx")); - - File file = null; - file = File.createTempFile("DataBagUtf16", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of chararray", after.bagOf() == Datum.DataType.CHARARRAY); - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum val = j.next(); - assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY); - assertTrue("encoding should be utf16", - ((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16); - assertEquals("value of valAfter", "zzz", ((DataCharArrayUtf16)val).get()); - - val = j.next(); - assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY); - assertTrue("encoding should be utf16", - ((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16); - assertEquals("value of valAfter2", "yyy", ((DataCharArrayUtf16)val).get()); - - val = j.next(); - assertTrue("should be a chararray", val.getType() == Datum.DataType.CHARARRAY); - assertTrue("encoding should be utf16", - ((DataCharArray)val).getEncoding() == DataCharArray.Encoding.UTF16); - assertEquals("value of valAfter", "xxx", ((DataCharArrayUtf16)val).get()); - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} + public void register(Spillable s) { + mManagedObjects.add(s); + } + + public void forceSpill() throws IOException { + Iterator i = mManagedObjects.iterator(); + while (i.hasNext()) i.next().spill(); + } + } + + // Need to override the regular bag factory so I can register with my local + // memory manager. + private class LocalBagFactory { + TestMemoryManager mMemMgr; + + public LocalBagFactory(TestMemoryManager mgr) { + mMemMgr = mgr; + } + + public DataBag newDefaultBag() { + DataBag bag = new DefaultDataBag(); + mMemMgr.register(bag); + return bag; + } + + public DataBag newSortedBag(EvalSpec sortSpec) { + DataBag bag = new SortedDataBag(sortSpec); + mMemMgr.register(bag); + return bag; + } + + public DataBag newDistinctBag() { + DataBag bag = new DistinctDataBag(); + mMemMgr.register(bag); + return bag; + } + } + + // Test reading and writing default from memory, no spills. + @Test + public void testDefaultInMemory() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDefaultBag(); + ArrayList rightAnswer = new ArrayList(10); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing default from file with one spill + @Test + public void testDefaultSingleSpill() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDefaultBag(); + ArrayList rightAnswer = new ArrayList(10); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing default from file with three spills + @Test + public void testDefaultTripleSpill() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDefaultBag(); + ArrayList rightAnswer = new ArrayList(30); + + // Write tuples into both + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with some in file, some in memory. + @Test + public void testDefaultInMemInFile() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDefaultBag(); + ArrayList rightAnswer = new ArrayList(20); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with a spill happening in the middle of the read. + @Test + public void testDefaultSpillDuringRead() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDefaultBag(); + ArrayList rightAnswer = new ArrayList(20); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + for (int i = 0; i < 15; i++) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + mgr.forceSpill(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing sorted from memory, no spills. + @Test + public void testSortedInMemory() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(10); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing default from file with one spill + @Test + public void testSortedSingleSpill() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(10); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing default from file with three spills + @Test + public void testSortedTripleSpill() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(30); + + // Write tuples into both + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with some in file, some in memory. + @Test + public void testSortedInMemInFile() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(20); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with a spill happening in the middle of the read. + @Test + public void testSortedSpillDuringRead() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(20); + + // Write tuples into both + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + + for (int i = 0; i < 15; i++) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll()); + } + + mgr.forceSpill(); + + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with first spill happening in the middle of the read. + @Test + public void testSortedFirstSpillDuringRead() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(20); + + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + + for (int i = 0; i < 5; i++) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rightAnswer.poll()); + } + + mgr.forceSpill(); + + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing sorted file with so many spills it requires + // premerge. + @Test + public void testSortedPreMerge() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newSortedBag(null); + PriorityQueue rightAnswer = new PriorityQueue(30); + + // Write tuples into both + for (int j = 0; j < 373; j++) { + for (int i = 0; i < 10; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt())); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + + Tuple t; + while ((t = rightAnswer.poll()) != null) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), t); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing distinct from memory, no spills. + @Test + public void testDistinctInMemory() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDistinctBag(); + TreeSet rightAnswer = new TreeSet(); + + // Write tuples into both + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5)); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing distinct from file with one spill + @Test + public void testDistinctSingleSpill() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDistinctBag(); + TreeSet rightAnswer = new TreeSet(); + + // Write tuples into both + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing distinct from file with three spills + @Test + public void testDistinctTripleSpill() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDistinctBag(); + TreeSet rightAnswer = new TreeSet(); + + // Write tuples into both + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with some in file, some in memory. + @Test + public void testDistinctInMemInFile() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDistinctBag(); + TreeSet rightAnswer = new TreeSet(); + + // Write tuples into both + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading with a spill happening in the middle of the read. + @Test + public void testDistinctSpillDuringRead() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDistinctBag(); + TreeSet rightAnswer = new TreeSet(); + + // Write tuples into both + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(i)); + b.add(t); + rightAnswer.add(t); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + for (int i = 0; i < 5; i++) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + mgr.forceSpill(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test reading and writing distinct from file with enough spills to + // force a pre-merge + @Test + public void testDistinctPreMerge() throws Exception { + TestMemoryManager mgr = new TestMemoryManager(); + LocalBagFactory factory = new LocalBagFactory(mgr); + DataBag b = factory.newDistinctBag(); + TreeSet rightAnswer = new TreeSet(); + + // Write tuples into both + for (int j = 0; j < 321; j++) { + for (int i = 0; i < 50; i++) { + Tuple t = new Tuple(new DataAtom(rand.nextInt() % 5)); + b.add(t); + rightAnswer.add(t); + } + mgr.forceSpill(); + } + + // Read tuples back, hopefully they come out in the same order. + Iterator bIter = b.iterator(); + Iterator rIter = rightAnswer.iterator(); + + while (rIter.hasNext()) { + assertTrue("bag ran out of tuples before answer", bIter.hasNext()); + assertEquals("tuples should be the same", bIter.next(), rIter.next()); + } + + assertFalse("right answer ran out of tuples before the bag", + bIter.hasNext()); + } + + // Test the default bag factory. + @Test + public void testDefaultBagFactory() throws Exception { + BagFactory f = BagFactory.getInstance(); + + DataBag bag = f.newDefaultBag(); + DataBag sorted = f.newSortedBag(null); + DataBag distinct = f.newDistinctBag(); + + assertTrue("Expected a default bag", (bag instanceof DefaultDataBag)); + assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag)); + assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag)); + } + + @Test + public void testProvidedBagFactory() throws Exception { + // Test bogus factory name. + BagFactory.resetSelf(); + System.setProperty("pig.data.bag.factory.name", "no such class"); + System.setProperty("pig.data.bag.factory.jar", "file:./pig.jar"); + boolean caughtIt = false; + try { + BagFactory f = BagFactory.getInstance(); + } catch (RuntimeException re) { + assertEquals("Expected Unable to instantiate message", + "Unable to instantiate bag factory no such class", + re.getMessage()); + caughtIt = true; + } + assertTrue("Expected to catch exception", caughtIt); + + // Test factory that isn't a BagFactory + BagFactory.resetSelf(); + System.setProperty("pig.data.bag.factory.name", + "org.apache.pig.test.TestDataBag"); + System.setProperty("pig.data.bag.factory.jar", + "file:./pig.jar"); + caughtIt = false; + try { + BagFactory f = BagFactory.getInstance(); + } catch (RuntimeException re) { + assertEquals("Expected does not extend BagFactory message", + "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!", + re.getMessage()); + caughtIt = true; + } + assertTrue("Expected to catch exception", caughtIt); + + // Test that we can instantiate our test factory. + BagFactory.resetSelf(); + System.setProperty("pig.data.bag.factory.name", + "org.apache.pig.test.NonDefaultBagFactory"); + System.setProperty("pig.data.bag.factory.jar", "file:./pig.jar"); + BagFactory f = BagFactory.getInstance(); + DataBag b = f.newDefaultBag(); + b = f.newSortedBag(null); + b = f.newDistinctBag(); -public void testWriteReadNone() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.CHARARRAY); - - String s = new String("zzz"); - before.add(new DataCharArrayNone(s.getBytes())); - s = new String("yyy"); - before.add(new DataCharArrayNone(s.getBytes())); - s = new String("xxx"); - before.add(new DataCharArrayNone(s.getBytes())); - - File file = null; - file = File.createTempFile("DataBagCharArrayNone", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of chararray", after.bagOf() == Datum.DataType.CHARARRAY); - assertEquals("after read, size", 3, after.size()); - - Iterator j = after.content(); - - Datum valAfter = j.next(); - assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY); - assertTrue("encoding should be none", - ((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE); - for (int i = 0; i < 3; i++) { - assertEquals("value of valAfter", (byte)0x7a, - ((DataCharArrayNone)valAfter).get()[i]); - } - - valAfter = j.next(); - assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY); - assertTrue("encoding should be none", - ((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE); - for (int i = 0; i < 3; i++) { - assertEquals("value of valAfter", (byte)0x79, - ((DataCharArrayNone)valAfter).get()[i]); - } - - valAfter = j.next(); - assertTrue("should be a chararray", valAfter.getType() == Datum.DataType.CHARARRAY); - assertTrue("encoding should be none", - ((DataCharArray)valAfter).getEncoding() == DataCharArray.Encoding.NONE); - for (int i = 0; i < 3; i++) { - assertEquals("value of valAfter", (byte)0x78, - ((DataCharArrayNone)valAfter).get()[i]); - } - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); + BagFactory.resetSelf(); + } } -public void testWriteReadMap() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.MAP); - - DataMap map = new DataMap(); - - DataInteger key = new DataInteger(1); - Datum val = new DataInteger(99); - map.put(key, val); - - before.add(map); - - File file = null; - file = File.createTempFile("DataBagCharArrayNone", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of maps", after.bagOf() == Datum.DataType.MAP); - assertEquals("after read, size", 1, after.size()); - - Iterator j = after.content(); - - Datum v = j.next(); - assertTrue("valAfter should be a map", v.getType() == Datum.DataType.MAP); - DataMap valAfter = (DataMap)v; - - assertEquals("valAfter size", 1L, valAfter.size()); - - DataInteger nosuch = new DataInteger(-1); - Datum d = valAfter.get(nosuch); - assertTrue("after read, no such key", d.isNull()); - - Datum mapValAfter = valAfter.get(key); - assertTrue("mapValAfter isa integer", mapValAfter instanceof DataInteger); - assertEquals("value of valAfter", 99, ((DataInteger)mapValAfter).get()); - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadTuple() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.TUPLE); - - Tuple t = new Tuple(1); - t.setField(0, new DataInteger(1)); - before.add(t); - - File file = null; - file = File.createTempFile("DataBagCharArrayNone", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of tuples", after.bagOf() == Datum.DataType.TUPLE); - assertEquals("after read, size", 1, after.size()); - - Iterator j = after.content(); - - Datum v = j.next(); - assertTrue("valAfter should be a tuple", - v.getType() == Datum.DataType.TUPLE); - - Tuple valAfter = (Tuple)v; - - assertEquals("valAfter size", 1L, valAfter.size()); - - Datum tupleValAfter = valAfter.getField(0); - assertTrue("tupleValAfter isa integer", tupleValAfter instanceof DataInteger); - assertEquals("value of valAfter", 1, ((DataInteger)tupleValAfter).get()); - - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -public void testWriteReadBag() throws Exception -{ - DataBag before = new DataBag(Datum.DataType.BAG); - - DataBag b = new DataBag(Datum.DataType.INT); - b.add(new DataInteger(2)); - before.add(b); - - File file = null; - file = File.createTempFile("DataBagCharArrayNone", "put"); - FileOutputStream fos = new FileOutputStream(file); - DataOutput out = new DataOutputStream(fos); - before.write(out); - fos.close(); - - FileInputStream fis = new FileInputStream(file); - DataInput in = new DataInputStream(fis); - Datum a = DatumImpl.readDatum(in); - - assertTrue("isa DataBag", a instanceof DataBag); - - DataBag after = (DataBag)a; - - assertTrue("bag of bags", after.bagOf() == Datum.DataType.BAG); - assertEquals("after read, size", 1, after.size()); - - Iterator j = after.content(); - - Datum v = j.next(); - assertTrue("valAfter should be a bag", v.getType() == Datum.DataType.BAG); - DataBag valAfter = (DataBag)v; - - assertEquals("valAfter size", 1L, valAfter.size()); - - Iterator k = valAfter.content(); - Datum w = k.next(); - assertTrue("bagValAfter should be an integer", - w.getType() == Datum.DataType.INT); - DataInteger bagValAfter = (DataInteger)w; - - assertEquals("value of valAfter", 2, bagValAfter.get()); - - assertFalse("should have read all values in inner bag", k.hasNext()); - assertFalse("should have read all values in bag", j.hasNext()); - - file.delete(); -} - -} -