Return-Path: Delivered-To: apmail-incubator-pig-commits-archive@locus.apache.org Received: (qmail 55150 invoked from network); 22 Jan 2008 21:18:22 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 22 Jan 2008 21:18:22 -0000 Received: (qmail 51014 invoked by uid 500); 22 Jan 2008 21:18:12 -0000 Delivered-To: apmail-incubator-pig-commits-archive@incubator.apache.org Received: (qmail 50957 invoked by uid 500); 22 Jan 2008 21:18:12 -0000 Mailing-List: contact pig-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@incubator.apache.org Delivered-To: mailing list pig-commits@incubator.apache.org Received: (qmail 50938 invoked by uid 99); 22 Jan 2008 21:18:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jan 2008 13:18:11 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Jan 2008 21:17:55 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C15E41A984E; Tue, 22 Jan 2008 13:17:45 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r614325 [4/6] - in /incubator/pig/branches/types: ./ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/builtin/ src/org/apache/pig/data/ src/org/apache/pig/impl/ src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/eval/ src/org/apac... Date: Tue, 22 Jan 2008 21:17:22 -0000 To: pig-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080122211745.C15E41A984E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileReader.java Tue Jan 22 13:17:12 2008 @@ -15,75 +15,87 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig.impl.io; +/* +package org.apache.pig.impl.io; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Iterator; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Iterator; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.mapreduceExec.PigMapReduce; -import org.apache.pig.data.Datum; -import org.apache.pig.data.DatumImpl; + +public class DataBagFileReader { + File store; + + public DataBagFileReader(File f) throws IOException{ + store = f; + } + + public static int notifyInterval = 1000; + public int numNotifies; + private class myIterator implements Iterator{ + DataInputStream in; + Tuple nextTuple; + int curCall; + + public myIterator() throws IOException{ + numNotifies = 0; + in = new DataInputStream(new BufferedInputStream(new FileInputStream(store))); + getNextTuple(); + } + + private void getNextTuple() throws IOException{ + if (curCall < notifyInterval - 1) + curCall ++; + else{ + if (PigMapReduce.reporter != null) + PigMapReduce.reporter.progress(); + curCall = 0; + numNotifies ++; + } - -public class DataBagFileReader { - File store; - - public DataBagFileReader(File f) throws IOException{ - store = f; - } - - private class myIterator implements Iterator{ - DataInputStream in; - Datum nextDatum; - - public myIterator() throws IOException{ - in = new DataInputStream(new BufferedInputStream(new FileInputStream(store))); - getNextDatum(); - } - - private void getNextDatum() throws IOException{ - try{ - /* - nextDatum = new Datum(); - nextDatum.readFields(in); - */ - nextDatum = DatumImpl.readDatum(in); - } catch (EOFException e) { - in.close(); - nextDatum = null; - } - } - - public boolean hasNext(){ - return nextDatum != null; - } - - public Datum next(){ - Datum returnValue = nextDatum; - if (returnValue!=null){ - try{ - getNextDatum(); - }catch (IOException e){ - throw new RuntimeException(e.getMessage()); - } - } - return returnValue; - } - - public void remove(){ - throw new RuntimeException("Read only cursor"); - } - } - - public Iterator content() throws IOException{ - return new myIterator(); - } - - public void clear() throws IOException{ - store.delete(); - } -} + try{ + nextTuple = new Tuple(); + nextTuple.readFields(in); + } catch (EOFException e) { + in.close(); + nextTuple = null; + } + } + + public boolean hasNext(){ + return nextTuple != null; + } + + public Tuple next(){ + Tuple returnValue = nextTuple; + if (returnValue!=null){ + try{ + getNextTuple(); + }catch (IOException e){ + throw new RuntimeException(e.getMessage()); + } + } + return returnValue; + } + + public void remove(){ + throw new RuntimeException("Read only cursor"); + } + } + + public Iterator content() throws IOException{ + return new myIterator(); + } + + public void clear() throws IOException{ + store.delete(); + } +} +*/ Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/DataBagFileWriter.java Tue Jan 22 13:17:12 2008 @@ -15,44 +15,56 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig.impl.io; - -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Iterator; - -import org.apache.pig.data.Datum; - - - -public class DataBagFileWriter { - File store; - DataOutputStream out; - - public DataBagFileWriter(File store) throws IOException{ - this.store = store; - out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store))); - } - - public void write(Datum d) throws IOException{ - d.write(out); - } + /* +package org.apache.pig.impl.io; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.pig.data.Tuple; + + + +public class DataBagFileWriter { + File store; + DataOutputStream out; + + public DataBagFileWriter(File store) throws IOException{ + this.store = store; + out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(store))); + } + + public void write(Tuple t) throws IOException{ + t.write(out); + } + + public long write(Iterator iter) throws IOException{ - public void write(Iterator iter) throws IOException{ - while (iter.hasNext()) + long initialSize = getFileLength(); + while (iter.hasNext()) iter.next().write(out); - } - - public void close() throws IOException{ - flush(); - out.close(); - } + + return getFileLength() - initialSize; + } - public void flush() throws IOException{ + public long getFileLength() throws IOException{ out.flush(); + return store.length(); } -} + + public void close() throws IOException{ + flush(); + out.close(); + } + + public void flush() throws IOException{ + out.flush(); + } + +} +*/ Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue Jan 22 13:17:12 2008 @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.pig.PigServer.ExecType; import org.apache.pig.impl.PigContext; @@ -158,7 +159,10 @@ Path paths[] = null; if (fs.exists(path)) { if (fs.isFile(path)) return fs.open(path); - paths = fs.listPaths(path); + FileStatus fileStat[] = fs.listStatus(path); + paths = new Path[fileStat.length]; + for (int i = 0; i < fileStat.length; i++) + paths[i] = fileStat[i].getPath(); } else { // It might be a glob if (!globMatchesFiles(path, paths, fs)) throw new IOException(path + " does not exist"); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/io/PigFile.java Tue Jan 22 13:17:12 2008 @@ -27,7 +27,6 @@ import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; -import org.apache.pig.data.Datum; import org.apache.pig.impl.PigContext; @@ -45,8 +44,7 @@ } public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException { - DataBag content = - BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE); + DataBag content = BagFactory.getInstance().newDefaultBag(); InputStream is = FileLocalizer.open(file, pigContext); lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); Tuple f = null; @@ -60,8 +58,8 @@ public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException { BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext)); sfunc.bindTo(bos); - for (Iterator it = data.content(); it.hasNext();) { - Tuple row = (Tuple)it.next(); + for (Iterator it = data.iterator(); it.hasNext();) { + Tuple row = it.next(); sfunc.putNext(row); } sfunc.finish(); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Tue Jan 22 13:17:12 2008 @@ -21,8 +21,8 @@ import java.util.ArrayList; import java.util.List; -import org.apache.pig.data.Datum; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.logicalLayer.schema.AtomSchema; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -30,136 +30,153 @@ -public class LOCogroup extends LogicalOperator{ - private static final long serialVersionUID = 1L; - - protected ArrayList specs; - - public LOCogroup(List inputs, ArrayList specs) { - super(inputs); +public class LOCogroup extends LogicalOperator { + private static final long serialVersionUID = 1L; + + protected ArrayList specs; + + public LOCogroup(List inputs, + ArrayList specs) { + super(inputs); this.specs = specs; getOutputType(); } - - @Override - public String name() { - return "CoGroup"; - } - @Override - public String arguments() { - StringBuffer sb = new StringBuffer(); - + + @Override + public String name() { + if (inputs.size() == 1) return "Group"; + else return "CoGroup"; + } + @Override + public String arguments() { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < specs.size(); i++) { sb.append(specs.get(i)); - if (i+1 < specs.size()) sb.append(", "); + if (i + 1 < specs.size()) + sb.append(", "); } - + return sb.toString(); - } - - public static Datum[] getGroupAndTuple(Datum d){ - if (!(d instanceof Tuple)){ - throw new RuntimeException("Internal Error: Evaluation of group expression did not return a tuple"); - } - Tuple output = (Tuple)d; - if (output.arity() < 2){ - throw new RuntimeException("Internal Error: Evaluation of group expression returned a tuple with <2 fields"); - } - - Datum[] groupAndTuple = new Datum[2]; - if (output.arity() == 2){ - groupAndTuple[0] = output.getField(0); - groupAndTuple[1] = output.getField(1); - }else{ - Tuple group = new Tuple(); - for (int j=0; j fields = ((TupleSchema)groupElementSchema).getFields(); - - if (fields.size() < 2) - throw new RuntimeException("Internal Error: Schema of group expression retured <2 fields"); - - if (fields.size() == 2){ - groupElementSchema = fields.get(0); - groupElementSchema.removeAllAliases(); - groupElementSchema.setAlias("group"); - }else{ - groupElementSchema = new TupleSchema(); - groupElementSchema.setAlias("group"); - - for (int i=0; i fields = + ((TupleSchema) groupElementSchema).getFields(); + + if (fields.size() < 2) + throw new RuntimeException + ("Internal Error: Schema of group expression retured <2 fields"); + + if (fields.size() == 2) { + groupElementSchema = fields.get(0); + groupElementSchema.removeAllAliases(); + groupElementSchema.setAlias("group"); + } else { + groupElementSchema = new TupleSchema(); + groupElementSchema.setAlias("group"); + + for (int i = 0; i < fields.size() - 1; i++) { + ((TupleSchema) groupElementSchema).add(fields.get(i)); + } + } + + } + + schema.add(groupElementSchema); + + for (LogicalOperator lo:getInputs()) { + TupleSchema inputSchema = lo.outputSchema(); + if (inputSchema == null) + inputSchema = new TupleSchema(); + schema.add(inputSchema); + } + } + + schema.setAlias(alias); return schema; } - + @Override - public int getOutputType(){ - int outputType = FIXED; - for (int i=0; i getFuncs() { + public List getFuncs() { List funcs = super.getFuncs(); - for (EvalSpec spec: specs) { + for (EvalSpec spec:specs) { funcs.addAll(spec.getFuncs()); } return funcs; } - public ArrayList getSpecs() { - return specs; - } + public ArrayList getSpecs() { + return specs; + } - + public void visit(LOVisitor v) { + v.visitCogroup(this); + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java Tue Jan 22 13:17:12 2008 @@ -24,34 +24,36 @@ -public class LOEval extends LogicalOperator{ - private static final long serialVersionUID = 1L; - - protected EvalSpec spec; +public class LOEval extends LogicalOperator { + private static final long serialVersionUID = 1L; + + protected EvalSpec spec; public LOEval(LogicalOperator input, EvalSpec specIn) { - super(input); + super(input); spec = specIn; getOutputType(); } @Override - public String name() { - return "Eval"; + public String name() { + return "Foreach"; } @Override - public String arguments() { + public String arguments() { return spec.toString(); } @Override - public TupleSchema outputSchema() { + public TupleSchema outputSchema() { if (schema == null) { //System.out.println("LOEval input: " + inputs[0].outputSchema()); //System.out.println("LOEval spec: " + spec); - schema = (TupleSchema)spec.getOutputSchemaForPipe(getInputs().get(0).outputSchema()); - + schema = + (TupleSchema) spec.getOutputSchemaForPipe(getInputs().get(0). + outputSchema()); + //System.out.println("LOEval output: " + schema); } schema.setAlias(alias); @@ -59,7 +61,7 @@ } @Override - public int getOutputType() { + public int getOutputType() { switch (getInputs().get(0).getOutputType()) { case FIXED: return FIXED; @@ -72,13 +74,17 @@ } @Override - public List getFuncs() { + public List getFuncs() { List funcs = super.getFuncs(); funcs.addAll(spec.getFuncs()); return funcs; } - public EvalSpec getSpec() { - return spec; + public EvalSpec getSpec() { + return spec; + } + + public void visit(LOVisitor v) { + v.visitEval(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Jan 22 13:17:12 2008 @@ -29,70 +29,67 @@ public class LOLoad extends LogicalOperator { - private static final long serialVersionUID = 1L; - - protected FileSpec inputFileSpec; - - protected int outputType = FIXED; + private static final long serialVersionUID = 1L; + protected FileSpec inputFileSpec; - public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException{ - super(); - this.inputFileSpec = inputFileSpec; - try - { - LoadFunc storageFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec()); - } - catch (IOException e) - { - Throwable cause = e.getCause(); - while (cause != null && cause.getClass().getName() != "java.lang.ClassNotFoundException") - { - System.out.println("cause = " + cause.getClass().getName()); - cause = cause.getCause(); - } - - if (cause != null) - { - throw new ParseException("Load function " + inputFileSpec.getFuncSpec() + " not found"); - } - else - { - throw e; - } - - } + protected int outputType = FIXED; + + + public LOLoad(FileSpec inputFileSpec) throws IOException, ParseException { + super(); + this.inputFileSpec = inputFileSpec; + try { + LoadFunc storageFunc = + (LoadFunc) PigContext.instantiateFuncFromSpec(inputFileSpec. + getFuncSpec()); + } catch(IOException e) { + Throwable cause = e.getCause(); + while (cause != null + && cause.getClass().getName() != + "java.lang.ClassNotFoundException") { + System.out.println("cause = " + cause.getClass().getName()); + cause = cause.getCause(); + } if (cause != null) { + throw new ParseException("Load function " + + inputFileSpec.getFuncSpec() + + " not found"); + } else { + throw e; + } + + } //TODO: Handle Schemas defined by Load Functions schema = new TupleSchema(); } @Override - public String name() { + public String name() { return "Load"; } - - public FileSpec getInputFileSpec(){ - return inputFileSpec; + + public FileSpec getInputFileSpec() { + return inputFileSpec; } - + public void setInputFileSpec(FileSpec spec) { - inputFileSpec = spec; + inputFileSpec = spec; } - - @Override - public String arguments() { - return inputFileSpec.toString(); + + @Override + public String arguments() { + return inputFileSpec.toString(); } @Override - public TupleSchema outputSchema() { - schema.setAlias(alias); + public TupleSchema outputSchema() { + schema.setAlias(alias); return this.schema; } @Override - public int getOutputType() { + public int getOutputType() { return outputType; } @@ -104,18 +101,22 @@ } @Override - public String toString() { - StringBuffer result = new StringBuffer(super.toString()); - result.append(" (outputType: "); - result.append(outputType); - result.append(')'); - return result.toString(); - } + public String toString() { + StringBuffer result = new StringBuffer(super.toString()); + result.append(" (outputType: "); + result.append(outputType); + result.append(')'); + return result.toString(); + } - @Override - public List getFuncs() { + @Override + public List getFuncs() { List funcs = super.getFuncs(); funcs.add(inputFileSpec.getFuncName()); return funcs; } + + public void visit(LOVisitor v) { + v.visitLoad(this); + } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LORead.java Tue Jan 22 13:17:12 2008 @@ -24,78 +24,80 @@ public class LORead extends LogicalOperator { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - - protected IntermedResult readFrom = null; - boolean readsFromSplit = false; - - @Override - public String toString() { - StringBuffer result = new StringBuffer(super.toString()); - result.append(" (readsFromSplit: "); - result.append(readsFromSplit); - result.append(')'); - return result.toString(); - } + protected IntermedResult readFrom = null; + boolean readsFromSplit = false; + @Override + public String toString() { + StringBuffer result = new StringBuffer(super.toString()); + result.append(" (readsFromSplit: "); + result.append(readsFromSplit); + result.append(')'); + return result.toString(); + } - //Since intermed result may have multiple outputs, which output do I read? - + //Since intermed result may have multiple outputs, which output do I read? public int splitOutputToRead = 0; - - public LORead(IntermedResult readFromIn) { - super(); + + public LORead(IntermedResult readFromIn) { + super(); readFrom = readFromIn; - } - - public LORead(IntermedResult readFromIn, int outputToRead) { - super(); - readsFromSplit = true; - this.splitOutputToRead = outputToRead; + } + + public LORead(IntermedResult readFromIn, int outputToRead) { + super(); + readsFromSplit = true; + this.splitOutputToRead = outputToRead; readFrom = readFromIn; - } - - public boolean readsFromSplit(){ - return readsFromSplit; - } - - @Override - public String name() { - return "Read"; - } - @Override - public String arguments() { - return alias; - } - + } + + public boolean readsFromSplit() { + return readsFromSplit; + } + + @Override + public String name() { + return "Read"; + } + @Override + public String arguments() { + return alias; + } + @Override - public TupleSchema outputSchema() { - if (schema == null) { - if (readFrom.lp != null && readFrom.lp.root != null && readFrom.lp.root.outputSchema() != null) { + public TupleSchema outputSchema() { + if (schema == null) { + if (readFrom.lp != null && readFrom.lp.root != null + && readFrom.lp.root.outputSchema() != null) { schema = readFrom.lp.root.outputSchema().copy(); } else { schema = new TupleSchema(); } - } - - schema.removeAllAliases(); + } + + schema.removeAllAliases(); schema.setAlias(alias); - + return schema; } - - - - @Override - public int getOutputType(){ - return readFrom.getOutputType(); - } - public IntermedResult getReadFrom() { - return readFrom; + + + @Override + public int getOutputType() { + return readFrom.getOutputType(); + } + + public IntermedResult getReadFrom() { + return readFrom; + } + + public void visit(LOVisitor v) { + v.visitRead(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Tue Jan 22 13:17:12 2008 @@ -15,64 +15,67 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig.impl.logicalLayer; - - +package org.apache.pig.impl.logicalLayer; + + import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.logicalLayer.schema.TupleSchema; - -public class LOSort extends LogicalOperator { - private static final long serialVersionUID = 1L; - private EvalSpec sortSpec; - - - protected EvalSpec spec; - - public EvalSpec getSpec() { - return spec; - } - - - - public LOSort( LogicalOperator input, EvalSpec sortSpec){ - super(input); - this.sortSpec = sortSpec; - getOutputType(); - } - - @Override - public String name() { - return "SORT"; - } - - @Override - public String arguments() { - return sortSpec.toString(); - } - - @Override - public int getOutputType() { - switch(getInputs().get(0).getOutputType()){ - case FIXED: - return FIXED; - default: - throw new RuntimeException("Blocking operator such as sort cannot handle streaming input"); - } - } - - @Override - public TupleSchema outputSchema() { - if (schema== null) - schema = getInputs().get(0).outputSchema().copy(); - - schema.setAlias(alias); - return schema; - - } - - public EvalSpec getSortSpec() { - return sortSpec; - } - -} + +public class LOSort extends LogicalOperator { + private static final long serialVersionUID = 1L; + private EvalSpec sortSpec; + + + protected EvalSpec spec; + + public EvalSpec getSpec() { + return spec; + } + + public LOSort(LogicalOperator input, EvalSpec sortSpec) { + super(input); + this.sortSpec = sortSpec; + getOutputType(); + } + + @Override + public String name() { + return "SORT"; + } + + @Override + public String arguments() { + return sortSpec.toString(); + } + + @Override + public int getOutputType() { + switch (getInputs().get(0).getOutputType()) { + case FIXED: + return FIXED; + default: + throw new RuntimeException + ("Blocking operator such as sort cannot handle streaming input"); + } + } + + @Override + public TupleSchema outputSchema() { + if (schema == null) + schema = getInputs().get(0).outputSchema().copy(); + + schema.setAlias(alias); + return schema; + + } + + public EvalSpec getSortSpec() { + return sortSpec; + } + + public void visit(LOVisitor v) { + v.visitSort(this); + } + +} Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSplit.java Tue Jan 22 13:17:12 2008 @@ -15,41 +15,44 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig.impl.logicalLayer; - -import java.util.ArrayList; -import java.util.List; +package org.apache.pig.impl.logicalLayer; + +import java.util.ArrayList; +import java.util.List; import org.apache.pig.impl.eval.cond.Cond; import org.apache.pig.impl.logicalLayer.schema.TupleSchema; - - -public class LOSplit extends LogicalOperator { - private static final long serialVersionUID = 1L; - - List conditions = new ArrayList(); - - public LOSplit(LogicalOperator input){ - super(input); - } - - public void addCond(Cond cond){ - conditions.add(cond); - } - - @Override - public int getOutputType(){ - return getInputs().get(0).getOutputType(); - } - - public ArrayList getConditions(){ - return new ArrayList(conditions); - } - - @Override - public TupleSchema outputSchema(){ - return getInputs().get(0).outputSchema().copy(); - } - - -} + + +public class LOSplit extends LogicalOperator { + private static final long serialVersionUID = 1L; + + List conditions = new ArrayList(); + + public LOSplit(LogicalOperator input) { + super(input); + } + + public void addCond(Cond cond) { + conditions.add(cond); + } + + @Override + public int getOutputType() { + return getInputs().get(0).getOutputType(); + } + + public ArrayList getConditions() { + return new ArrayList (conditions); + } + + @Override + public TupleSchema outputSchema() { + return getInputs().get(0).outputSchema().copy(); + } + + public void visit(LOVisitor v) { + v.visitSplit(this); + } + +} Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Jan 22 13:17:12 2008 @@ -27,59 +27,62 @@ public class LOStore extends LogicalOperator { - private static final long serialVersionUID = 1L; - - protected FileSpec outputFileSpec; + private static final long serialVersionUID = 1L; - protected boolean append; - + protected FileSpec outputFileSpec; - public LOStore(LogicalOperator input, FileSpec fileSpec, boolean append) throws IOException{ - super(input); + protected boolean append; + + + public LOStore(LogicalOperator input, + FileSpec fileSpec, + boolean append) throws IOException { + super(input); this.outputFileSpec = fileSpec; this.append = append; //See if the store function spec is valid - try{ - StoreFunc StoreFunc = (StoreFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec()); - }catch (Exception e){ - IOException ioe = new IOException(e.getMessage()); - ioe.setStackTrace(e.getStackTrace()); - throw ioe; - } - - getOutputType(); + try { + StoreFunc StoreFunc = + (StoreFunc) PigContext.instantiateFuncFromSpec( + fileSpec.getFuncSpec()); + } catch(Exception e) { + IOException ioe = new IOException(e.getMessage()); + ioe.setStackTrace(e.getStackTrace()); + throw ioe; + } getOutputType(); } - - - public FileSpec getOutputFileSpec(){ - return outputFileSpec; - } - - - @Override - public String toString() { - - StringBuffer result = new StringBuffer(super.toString()); - result.append(" (append: "); - result.append(append); - result.append(')'); - return result.toString(); - } - @Override - public String name() { + public FileSpec getOutputFileSpec() { + return outputFileSpec; + } + + + @Override + public String toString() { + StringBuffer result = new StringBuffer(super.toString()); + result.append(" (append: "); + result.append(append); + result.append(')'); + return result.toString(); + } + + + @Override + public String name() { return "Store"; } @Override - public TupleSchema outputSchema() { - throw new RuntimeException("Internal error: Asking for schema of a store operator."); + public TupleSchema outputSchema() { + throw new + RuntimeException + ("Internal error: Asking for schema of a store operator."); } @Override - public int getOutputType() { + public int getOutputType() { switch (getInputs().get(0).getOutputType()) { case FIXED: return FIXED; @@ -91,14 +94,18 @@ } @Override - public List getFuncs() { + public List getFuncs() { List funcs = super.getFuncs(); funcs.add(outputFileSpec.getFuncName()); return funcs; } - public boolean isAppend() { - return append; + public boolean isAppend() { + return append; + } + + public void visit(LOVisitor v) { + v.visitStore(this); } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUnion.java Tue Jan 22 13:17:12 2008 @@ -24,52 +24,59 @@ public class LOUnion extends LogicalOperator { - private static final long serialVersionUID = 1L; - + private static final long serialVersionUID = 1L; + public LOUnion(List inputsIn) { - super(inputsIn); + super(inputsIn); } - + @Override - public String name() { + public String name() { return "Union"; } @Override - public TupleSchema outputSchema() { - if (schema == null){ - TupleSchema longest = getInputs().get(0).outputSchema(); - int current = 0; - for (LogicalOperator lo : getInputs()) { - if (lo != null && lo.outputSchema() != null && lo.outputSchema().numFields() > current) { - longest = lo.outputSchema(); - current = longest.numFields(); - } - } - schema = longest.copy(); - } - - schema.setAlias(alias); + public TupleSchema outputSchema() { + if (schema == null) { + TupleSchema longest = getInputs().get(0).outputSchema(); + int current = 0; + for (LogicalOperator lo:getInputs()) { + if (lo != null && lo.outputSchema() != null + && lo.outputSchema().numFields() > current) { + longest = lo.outputSchema(); + current = longest.numFields(); + } + } + schema = longest.copy(); + } + + schema.setAlias(alias); return schema; } - - @Override - public int getOutputType(){ - int outputType = FIXED; - for (int i=0; i inputs; - - protected LogicalOperator(){ - this.inputs = new ArrayList(); - } - - protected LogicalOperator(List inputs) { - this.inputs = inputs; - } - - protected LogicalOperator(LogicalOperator input) { - this.inputs = new ArrayList(); - inputs.add(input); - } - - public String getAlias() { - return alias; - } - - public void setAlias(String newAlias) { - alias = newAlias; - } - - public int getRequestedParallelism() { - return requestedParallelism; - } - - public void setRequestedParallelism(int newRequestedParallelism) { - requestedParallelism = newRequestedParallelism; - } - - @Override - public String toString() { - StringBuffer result = new StringBuffer(super.toString()); - result.append(" (alias: "); - result.append(alias); - result.append(", requestedParallelism: "); - result.append(requestedParallelism); - result.append(')'); - return result.toString(); - } + public String alias = null; - public abstract TupleSchema outputSchema(); + public static final int FIXED = 1; + public static final int MONOTONE = 2; + public static final int UPDATABLE = 3; // Reserved for future use + public static final int AMENDABLE = 4; + + protected int requestedParallelism = -1; + protected TupleSchema schema = null; + protected List inputs; + + protected LogicalOperator() { + this.inputs = new ArrayList (); + } protected LogicalOperator(List inputs) { + this.inputs = inputs; + } + + protected LogicalOperator(LogicalOperator input) { + this.inputs = new ArrayList (); + inputs.add(input); + } + + public String getAlias() { + return alias; + } + + public void setAlias(String newAlias) { + alias = newAlias; + } + + public int getRequestedParallelism() { + return requestedParallelism; + } + + public void setRequestedParallelism(int newRequestedParallelism) { + requestedParallelism = newRequestedParallelism; + } + + @Override public String toString() { + StringBuffer result = new StringBuffer(super.toString()); + result.append(" (alias: "); + result.append(alias); + result.append(", requestedParallelism: "); + result.append(requestedParallelism); + result.append(')'); + return result.toString(); + } + + public abstract TupleSchema outputSchema(); public String name() { return "ROOT"; @@ -99,10 +96,17 @@ } return funcs; } - + public abstract int getOutputType(); - public void setSchema(TupleSchema schema) { - this.schema = schema; - } + public void setSchema(TupleSchema schema) { + this.schema = schema; + } + + /** + * Visit all of the logical operators in a tree, starting with this + * one. + * @param v LOVisitor to visit this logical plan with. + */ + public abstract void visit(LOVisitor v); } Propchange: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/ ------------------------------------------------------------------------------ --- svn:ignore (added) +++ svn:ignore Tue Jan 22 13:17:12 2008 @@ -0,0 +1,8 @@ + +TokenMgrError.java +Token.java +SimpleNode.java +SimpleCharStream.java +ParseException.java +Node.java +JJTQueryParserState.java Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Jan 22 13:17:12 2008 @@ -180,7 +180,8 @@ if (spec instanceof CompositeEvalSpec) spec = ((CompositeEvalSpec)spec).getSpecs().get(0); if ( spec instanceof ConstSpec || - (spec instanceof FuncEvalSpec && ((FuncEvalSpec)spec).getReturnType() == DataAtom.class)) + (spec instanceof FuncEvalSpec && + DataType.isAtomic(DataType.findType(((FuncEvalSpec)spec).getReturnType())))) isAtomic = true; else if (spec instanceof FuncEvalSpec) isAtomic = false; @@ -515,7 +516,7 @@ } -LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec;} +LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec projSpec; String funcName;} { ( op = NestedExpr() @@ -530,6 +531,17 @@ ) | (sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);}) ) + ( + funcName = QualifiedFunction() + { + try { + sortSpec.setComparatorName(funcName); + } catch (Exception e){ + throw new ParseException(e.getMessage()); + } + } + )? + ) { return new LOSort(op, sortSpec); @@ -607,13 +619,23 @@ } EvalSpec NestedSortOrArrange(Schema over, Map specs): -{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t;} +{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t; String funcName;} { ( ( t = | t = ) item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); } - ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) - | sortSpec = Star() ) + ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) + | sortSpec = Star() ) + ( + funcName = QualifiedFunction() + { + try { + sortSpec.setComparatorName(funcName); + } catch (Exception e){ + throw new ParseException(e.getMessage()); + } + } + )? ) { return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); } } @@ -932,6 +954,7 @@ return funcName; } } + /** * Bug 831620 - '$' support Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Tue Jan 22 13:17:12 2008 @@ -17,8 +17,6 @@ */ package org.apache.pig.impl.mapreduceExec; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -29,19 +27,21 @@ import org.apache.log4j.Logger; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DataBag; +import org.apache.pig.data.BagFactory; import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.Tuple; -import org.apache.pig.data.Datum; -import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.io.PigFile; import org.apache.pig.impl.physicalLayer.POMapreduce; import org.apache.pig.impl.util.JarManager; +import org.apache.pig.impl.util.PigLogger; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.PigLogger; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.JobClient; @@ -65,7 +65,17 @@ numMRJobs = numMRJobsIn; mrJobNumber = 0; } - + + public static class PigWritableComparator extends WritableComparator { + public PigWritableComparator() { + super(Tuple.class); + } + + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ + return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2); + } + } + static Random rand = new Random(); /** @@ -91,7 +101,7 @@ * @throws IOException */ public boolean launchPig(POMapreduce pom) throws IOException { - Logger log = PigLogger.getLogger(); + Logger log = PigLogger.getLogger(); JobConf conf = new JobConf(pom.pigContext.getConf()); conf.setJobName(pom.pigContext.getJobName()); boolean success = false; @@ -112,7 +122,7 @@ { FileOutputStream fos = new FileOutputStream(submitJarFile); JarManager.createJar(fos, funcs, pom.pigContext); - System.out.println("Job jar size = " + submitJarFile.length()); + log.debug("Job jar size = " + submitJarFile.length()); conf.setJar(submitJarFile.getPath()); String user = System.getProperty("user.name"); conf.setUser(user != null ? user : "Pigster"); @@ -133,20 +143,28 @@ conf.set("pig.pigContext", ObjectSerializer.serialize(pom.pigContext)); conf.setMapRunnerClass(PigMapReduce.class); - if (pom.toCombine != null) - conf.setCombinerClass(PigCombine.class); + if (pom.toCombine != null) { + conf.setCombinerClass(PigCombine.class); + //conf.setCombinerClass(PigMapReduce.class); + } if (pom.quantilesFile!=null){ conf.set("pig.quantilesFile", pom.quantilesFile); - } + } + else{ + // this is not a sort job - can use byte comparison to speed up processing + conf.setOutputKeyComparatorClass(PigWritableComparator.class); + } if (pom.partitionFunction!=null){ conf.setPartitionerClass(SortPartitioner.class); } conf.setReducerClass(PigMapReduce.class); conf.setInputFormat(PigInputFormat.class); conf.setOutputFormat(PigOutputFormat.class); - conf.setInputKeyClass(UTF8.class); - conf.setInputValueClass(Tuple.class); + // not used starting with 0.15 conf.setInputKeyClass(Text.class); + // not used starting with 0.15 conf.setInputValueClass(Tuple.class); conf.setOutputKeyClass(Tuple.class); + if (pom.userComparator != null) + conf.setOutputKeyComparatorClass(pom.userComparator); conf.setOutputValueClass(IndexedTuple.class); conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs)); @@ -212,8 +230,8 @@ // create an empty output file PigFile f = new PigFile(outputFile.toString(), false); - f.store(new DataBag(Datum.DataType.TUPLE), - new PigStorage(), pom.pigContext); + f.store(BagFactory.getInstance().newDefaultBag(), + new PigStorage(), pom.pigContext); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigCombine.java Tue Jan 22 13:17:12 2008 @@ -28,10 +28,10 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.pig.data.BagFactory; -import org.apache.pig.data.BigDataBag; -import org.apache.pig.data.Datum; +import org.apache.pig.data.DataBag; import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.eval.collector.DataCollector; @@ -47,17 +47,20 @@ private OutputCollector oc; private int index; private int inputCount; - private BigDataBag bags[]; - private File tmpdir; + private DataBag bags[]; + private TupleFactory mTupleFactory = TupleFactory.getInstance(); + // private File tmpdir; public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { try { + /* tmpdir = new File(job.get("mapred.task.id")); tmpdir.mkdirs(); BagFactory.init(tmpdir); + */ PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job.get("pig.pigContext")); if (evalPipe == null) { inputCount = ((ArrayList)ObjectSerializer.deserialize(job.get("pig.inputs"))).size(); @@ -69,37 +72,37 @@ evalPipe = esp.setupPipe(finalout); //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString()); - bags = new BigDataBag[inputCount]; + bags = new DataBag[inputCount]; for (int i = 0; i < inputCount; i++) { - bags[i] = BagFactory.getInstance().getNewBigBag(Datum.DataType.TUPLE); + bags[i] = BagFactory.getInstance().newDefaultBag(); } } PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit(); index = split.getIndex(); - Datum groupName = ((Tuple) key).getField(0); + String groupName = (String)((Tuple) key).get(0); finalout.group = ((Tuple) key); finalout.index = index; - Tuple t = new Tuple(1 + inputCount); - t.setField(0, groupName); + Tuple t = mTupleFactory.newTuple(1 + inputCount); + t.set(0, groupName); for (int i = 1; i < 1 + inputCount; i++) { bags[i - 1].clear(); - t.setField(i, bags[i - 1]); + t.set(i, bags[i - 1]); } while (values.hasNext()) { IndexedTuple it = (IndexedTuple) values.next(); - t.getBagField(it.index + 1).add(it); + ((DataBag)t.get(it.index + 1)).add(it.toTuple()); } for (int i = 0; i < inputCount; i++) { // XXX: shouldn't we only do this if INNER flag is set? - if (t.getBagField(1 + i).isEmpty()) + if (((DataBag)t.get(1 + i)).size() == 0) return; } // throw new RuntimeException("combine input: " + t.toString()); evalPipe.add(t); - evalPipe.add(null); // EOF marker + // evalPipe.add(null); // EOF marker } catch (Throwable tr) { tr.printStackTrace(); RuntimeException exp = new RuntimeException(tr.getMessage()); @@ -132,10 +135,11 @@ } @Override - public void add(Datum d){ + public void add(Object d){ if (d == null) return; // EOF marker from eval pipeline; ignore try{ - oc.collect(group, new IndexedTuple(((Tuple)d).getTupleField(0),index)); + // oc.collect(group, new IndexedTuple(((Tuple)d).getTupleField(0),index)); + oc.collect(group, new IndexedTuple(((Tuple)d),index)); }catch (IOException e){ throw new RuntimeException(e); } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigInputFormat.java Tue Jan 22 13:17:12 2008 @@ -24,7 +24,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; @@ -37,6 +38,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.pig.LoadFunc; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.io.BufferedPositionedInputStream; @@ -45,7 +47,7 @@ import org.apache.tools.bzip2r.CBZip2InputStream; -public class PigInputFormat implements InputFormat, JobConfigurable { +public class PigInputFormat implements InputFormat, JobConfigurable { public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -87,15 +89,15 @@ //paths.add(path); for (int j = 0; j < paths.size(); j++) { Path fullPath = new Path(fs.getWorkingDirectory(), paths.get(j)); - if (fs.isDirectory(fullPath)) { - Path children[] = fs.listPaths(fullPath); + if (fs.getFileStatus(fullPath).isDir()) { + FileStatus children[] = fs.listStatus(fullPath); for(int k = 0; k < children.length; k++) { - paths.add(children[k]); + paths.add(children[k].getPath()); } continue; } - long bs = fs.getBlockSize(fullPath); - long size = fs.getLength(fullPath); + long bs = fs.getFileStatus(fullPath).getBlockSize(); + long size = fs.getFileStatus(fullPath).getLen(); long pos = 0; String name = paths.get(j).getName(); if (name.endsWith(".gz")) { @@ -114,7 +116,7 @@ return splits.toArray(new PigSplit[splits.size()]); } - public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { PigRecordReader r = new PigRecordReader(job, (PigSplit)split, compressionCodecs); return r; } @@ -127,7 +129,7 @@ codecList = conf.get("io.compression.codecs", "none"); } - public static class PigRecordReader implements RecordReader { + public static class PigRecordReader implements RecordReader { /** * This is a tremendously ugly hack to get around the fact that mappers do not have access * to their readers. We take advantage of the fact that RecordReader.next and Mapper.map is @@ -146,6 +148,7 @@ LoadFunc loader; CompressionCodecFactory compressionFactory; JobConf job; + TupleFactory mTupleFactory = TupleFactory.getInstance(); PigRecordReader(JobConf job, PigSplit split, CompressionCodecFactory compressionFactory) throws IOException { this.split = split; @@ -182,15 +185,15 @@ public JobConf getJobConf(){ return job; } - - public boolean next(Writable key, Writable value) throws IOException { + + public boolean next(Text key, Tuple value) throws IOException { Tuple t = loader.getNext(); if (t == null) { return false; } - ((UTF8) key).set(split.getPath().getName()); - ((Tuple)value).copyFrom(t); + key.set(split.getPath().getName()); + value.reference(t); return true; } @@ -206,19 +209,19 @@ return split; } - public WritableComparable createKey() { - return new UTF8(); + public Text createKey() { + return new Text(); } - public Writable createValue() { - return new Tuple(); + public Tuple createValue() { + return mTupleFactory.newTuple(); } - public float getProgress() throws IOException { - float progress = getPos() - split.getStart(); - float finish = split.getLength(); - return progress/finish; - } + public float getProgress() throws IOException { + float progress = getPos() - split.getStart(); + float finish = split.getLength(); + return progress/finish; + } } public void validateInput(JobConf arg0) throws IOException { Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigMapReduce.java Tue Jan 22 13:17:12 2008 @@ -36,9 +36,9 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; -import org.apache.pig.data.Datum; import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; import org.apache.pig.impl.eval.StarSpec; @@ -87,9 +87,10 @@ private int index; private int inputCount; private boolean isInner[]; - private File tmpdir; + // private File tmpdir; private static PigContext pigContext = null; ArrayList sideFileWriters = new ArrayList(); + TupleFactory mTupleFactory = TupleFactory.getInstance(); /** * This function is called in MapTask by Hadoop as the Mapper.run() method. We basically pull @@ -100,9 +101,11 @@ PigMapReduce.reporter = reporter; oc = output; + /* tmpdir = new File(job.get("mapred.task.id")); tmpdir.mkdirs(); BagFactory.init(tmpdir); + */ setupMapPipe(reporter); @@ -125,10 +128,12 @@ PigMapReduce.reporter = reporter; try { + /* tmpdir = new File(job.get("mapred.task.id")); tmpdir.mkdirs(); BagFactory.init(tmpdir); + */ oc = output; if (evalPipe == null) { @@ -136,22 +141,21 @@ } DataBag[] bags = new DataBag[inputCount]; - Datum groupName = ((Tuple) key).getField(0); - Tuple t = new Tuple(1 + inputCount); - t.setField(0, groupName); + String groupName = (String)((Tuple) key).get(0); + Tuple t = mTupleFactory.newTuple(1 + inputCount); + t.set(0, groupName); for (int i = 1; i < 1 + inputCount; i++) { - bags[i - 1] = - BagFactory.getInstance().getNewBag(Datum.DataType.TUPLE); - t.setField(i, bags[i - 1]); + bags[i - 1] = BagFactory.getInstance().newDefaultBag(); + t.set(i, bags[i - 1]); } while (values.hasNext()) { IndexedTuple it = (IndexedTuple) values.next(); - t.getBagField(it.index + 1).add(it.toTuple()); + ((DataBag)t.get(it.index + 1)).add(it.toTuple()); } for (int i = 0; i < inputCount; i++) { - if (isInner[i] && t.getBagField(1 + i).isEmpty()) + if (isInner[i] && ((DataBag)t.get(1 + i)).size() == 0) return; } @@ -300,14 +304,15 @@ } @Override - public void add(Datum d){ + public void add(Object d){ try{ if (group == null) { oc.collect(null, (Tuple)d); } else { - Datum[] groupAndTuple = LOCogroup.getGroupAndTuple(d); + Object[] groupAndTuple = LOCogroup.getGroupAndTuple(d); // wrap group label in a tuple, so it becomes writable. - oc.collect(new Tuple(groupAndTuple[0]), new IndexedTuple((Tuple)groupAndTuple[1], index)); + oc.collect(mTupleFactory.newTuple(groupAndTuple[0]), + new IndexedTuple((Tuple)groupAndTuple[1], index)); } }catch(IOException e){ throw new RuntimeException(e); @@ -329,7 +334,7 @@ } @Override - public void add(Datum d){ + public void add(Object d){ try{ //System.out.println("Adding " + d + " to reduce output"); oc.collect(null, (Tuple)d); Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/PigSplit.java Tue Jan 22 13:17:12 2008 @@ -161,7 +161,9 @@ try{ return ois.readObject(); }catch (ClassNotFoundException cnfe){ - throw new IOException(cnfe); + IOException newE = new IOException(cnfe.getMessage()); + newE.initCause(cnfe); + throw newE; } } Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java Tue Jan 22 13:17:12 2008 @@ -15,62 +15,66 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.pig.impl.mapreduceExec; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Partitioner; +package org.apache.pig.impl.mapreduceExec; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Partitioner; import org.apache.pig.builtin.BinStorage; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.BufferedPositionedInputStream; import org.apache.pig.impl.io.FileLocalizer; - - -public class SortPartitioner implements Partitioner { - Tuple[] quantiles; - - public int getPartition(WritableComparable key, Writable value, - int numPartitions) { - try{ - Tuple keyTuple = (Tuple)key; - int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0)); - if (index < 0) - index = -index-1; - return Math.min(index, numPartitions - 1); - }catch(IOException e){ - throw new RuntimeException(e); - } - } - - public void configure(JobConf job) { - String quantilesFile = job.get("pig.quantilesFile", ""); - if (quantilesFile.length() == 0) - throw new RuntimeException("Sort paritioner used but no quantiles found"); - - try{ - InputStream is = FileLocalizer.openDFSFile(quantilesFile,job); - BinStorage loader = new BinStorage(); - loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); - - Tuple t; - ArrayList quantiles = new ArrayList(); - - while(true){ - t = loader.getNext(); - if (t==null) - break; - quantiles.add(t); - } - this.quantiles = quantiles.toArray(new Tuple[0]); - }catch (IOException e){ - throw new RuntimeException(e); - } - } - -} + + +public class SortPartitioner implements Partitioner { + Tuple[] quantiles; + WritableComparator comparator; + + public int getPartition(WritableComparable key, Writable value, + int numPartitions) { + try{ + Tuple keyTuple = (Tuple)key; + int index = Arrays.binarySearch(quantiles, (Tuple)keyTuple.get(0), comparator); + if (index < 0) + index = -index-1; + return Math.min(index, numPartitions - 1); + }catch(IOException e){ + throw new RuntimeException(e); + } + } + + public void configure(JobConf job) { + String quantilesFile = job.get("pig.quantilesFile", ""); + if (quantilesFile.length() == 0) + throw new RuntimeException("Sort paritioner used but no quantiles found"); + + try{ + InputStream is = FileLocalizer.openDFSFile(quantilesFile,job); + BinStorage loader = new BinStorage(); + loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); + + Tuple t; + ArrayList quantiles = new ArrayList(); + + while(true){ + t = loader.getNext(); + if (t==null) + break; + quantiles.add(t); + } + this.quantiles = quantiles.toArray(new Tuple[0]); + }catch (IOException e){ + throw new RuntimeException(e); + } + + comparator = job.getOutputKeyComparator(); + } + +} Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/IntermedResult.java Tue Jan 22 13:17:12 2008 @@ -24,8 +24,8 @@ import org.apache.pig.PigServer.ExecType; import org.apache.pig.builtin.BinStorage; import org.apache.pig.data.DataBag; +import org.apache.pig.data.BagFactory; import org.apache.pig.data.Tuple; -import org.apache.pig.data.Datum; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.io.PigFile; @@ -59,7 +59,7 @@ public IntermedResult() { executed = true; - databag = new DataBag(Datum.DataType.TUPLE); + databag = BagFactory.getInstance().newDefaultBag(); } public IntermedResult(DataBag bag) { Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=614325&r1=614324&r2=614325&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java Tue Jan 22 13:17:12 2008 @@ -19,19 +19,29 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.Map; +import java.util.Iterator; +import org.apache.hadoop.io.WritableComparator; import org.apache.pig.builtin.BinStorage; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.FunctionInstantiator; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.RandomSampleLoader; +import org.apache.pig.impl.eval.BinCondSpec; import org.apache.pig.impl.eval.ConstSpec; import org.apache.pig.impl.eval.EvalSpec; +import org.apache.pig.impl.eval.FilterSpec; import org.apache.pig.impl.eval.FuncEvalSpec; import org.apache.pig.impl.eval.GenerateSpec; import org.apache.pig.impl.eval.ProjectSpec; +import org.apache.pig.impl.eval.CompositeEvalSpec; +import org.apache.pig.impl.eval.MapLookupSpec; import org.apache.pig.impl.eval.SortDistinctSpec; import org.apache.pig.impl.eval.StarSpec; +import org.apache.pig.impl.eval.EvalSpecVisitor; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.LOCogroup; @@ -194,9 +204,35 @@ } else { // push into "reduce" phase - // use combiner, if amenable - if (mro.toReduce == null && lo.getSpec().amenableToCombiner()) { - //TODO + EvalSpec spec = lo.getSpec(); + + if (mro.toReduce == null && shouldCombine(spec)) { + // Push this spec into the combiner. But we also need to + // create a new spec with a changed expected projection to + // push into the reducer. + + if (mro.toCombine != null) { + throw new AssertionError("Combiner already set."); + } + // mro.toCombine = spec; + + // Now, we need to adjust the expected projection for the + // eval spec(s) we just pushed. Also, this will change the + // function to be the final instead of general instance. + EvalSpec newSpec = spec.copy(pigContext); + newSpec.visit(new ReduceAdjuster(pigContext)); + mro.addReduceSpec(newSpec); + + // Adjust the function name for the combine spec, to set it + // to the initial function instead of the general + // instance. Make a copy of the eval spec rather than + // adjusting the existing one, to prevent breaking the + // logical plan in case another physical plan is generated + // from it later. + EvalSpec combineSpec = spec.copy(pigContext); + combineSpec.visit(new CombineAdjuster()); + mro.toCombine = combineSpec; + } else { mro.addReduceSpec(lo.getSpec()); // otherwise, don't use combiner } @@ -271,7 +307,216 @@ sortJob.addReduceSpec(new GenerateSpec(ps)); sortJob.reduceParallelism = loSort.getRequestedParallelism(); + + String comparatorFuncName = loSort.getSortSpec().getComparatorName(); + if (comparatorFuncName != null) { + try { + sortJob.userComparator = + (Class)Class.forName(comparatorFuncName); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to find user comparator " + comparatorFuncName, e); + } + } + return sortJob; } - + + private boolean shouldCombine(EvalSpec spec) { + // Determine whether this something we can combine or not. + // First, it must be a generate spec. + if (!(spec instanceof GenerateSpec)) { + return false; + } + + GenerateSpec gen = (GenerateSpec)spec; + + // Second, the first immediate child of the generate spec must be + // a project with a value of 0. + Iterator i = gen.getSpecs().iterator(); + if (!i.hasNext()) return false; + EvalSpec s = i.next(); + if (!(s instanceof ProjectSpec)) { + return false; + } else { + ProjectSpec p = (ProjectSpec)s; + if (p.numCols() > 1) return false; + else if (p.getCol() != 0) return false; + } + + // Third, all subsequent immediate children of the generate spec + // must be func eval specs + while (i.hasNext()) { + s = i.next(); + if (!(s instanceof FuncEvalSpec)) return false; + } + + // Third, walk the entire tree of the generate spec and see if we + // can combine it. + CombineDeterminer cd = new CombineDeterminer(); + gen.visit(cd); + return cd.useCombiner(); + } + + private class ReduceAdjuster extends EvalSpecVisitor { + private int position = 0; + FunctionInstantiator instantiator = null; + + public ReduceAdjuster(FunctionInstantiator fi) { + instantiator = fi; + } + + public void visitGenerate(GenerateSpec g) { + Iterator i = g.getSpecs().iterator(); + for (position = 0; i.hasNext(); position++) { + i.next().visit(this); + } + } + + public void visitFuncEval(FuncEvalSpec fe) { + // Need to replace our arg spec with a project of our position. + // DON'T visit our args, they're exactly what we're trying to + // lop off. + // The first ProjectSpec in the Composite is because the tuples + // will come out of the combiner in the form (groupkey, + // {(x, y, z)}). The second ProjectSpec contains the offset of + // the projection element we're interested in. + CompositeEvalSpec cs = new CompositeEvalSpec(new ProjectSpec(1)); + cs.addSpec(new ProjectSpec(position)); + fe.setArgs(new GenerateSpec(cs)); + + + // Reset the function to call the final instance of itself + // instead of the general instance. Have to instantiate the + // function itself first so we can find out if it's algebraic + // or not. + try { + fe.instantiateFunc(instantiator); + } catch (IOException e) { + throw new RuntimeException(e); + } + fe.resetFuncToFinal(); + } + } + + private class CombineAdjuster extends EvalSpecVisitor { + private int position = 0; + + //We don't want to be performing any flattening in the combiner since the column numbers in + //the reduce spec assume that there is no combiner. If the combiner performs flattening, the column + //numbers get messed up. For now, since combiner works only with generate group, func1(), func2(),..., + //it suffices to write visitors for those eval spec types. + + public void visitFuncEval(FuncEvalSpec fe) { + // Reset the function to call the initial instance of itself + // instead of the general instance. + fe.resetFuncToInitial(); + fe.setFlatten(false); + } + + + @Override + public void visitProject(ProjectSpec p) { + p.setFlatten(false); + } + + + } + + private class CombineDeterminer extends EvalSpecVisitor { + private class FuncCombinable extends EvalSpecVisitor { + public boolean combinable = true; + + @Override + public void visitBinCond(BinCondSpec bc) { + combinable = false; + } + + @Override + public void visitFilter(FilterSpec bc) { + combinable = false; + } + + @Override + public void visitFuncEval(FuncEvalSpec bc) { + combinable = false; + } + + @Override + public void visitSortDistinct(SortDistinctSpec bc) { + combinable = false; + } + }; + + private int shouldCombine = 0; + + public boolean useCombiner() { + return shouldCombine > 0; + } + + @Override + public void visitBinCond(BinCondSpec bc) { + // TODO Could be true if both are true. But the logic in + // CombineAdjuster and ReduceAdjuster don't know how to handle + // binconds, so just do false for now. + shouldCombine = -1; + } + + @Override + public void visitCompositeEval(CompositeEvalSpec ce) { + // If we've already determined we're not combinable, stop. + if (shouldCombine < 0) return; + + for (EvalSpec spec: ce.getSpecs()) { + spec.visit(this); + } + } + + // ConstSpec is a NOP, as it neither will benefit from nor + // prevents combinability. + + @Override + public void visitFilter(FilterSpec f) { + shouldCombine = -1; + } + + @Override + public void visitFuncEval(FuncEvalSpec fe) { + // Check the functions arguments, to make sure they are + // combinable. + FuncCombinable fc = new FuncCombinable(); + fe.getArgs().visit(fc); + if (!fc.combinable) { + shouldCombine = -1; + return; + } + + if (fe.combinable()) shouldCombine = 1; + else shouldCombine = -1; + } + + @Override + public void visitGenerate(GenerateSpec g) { + // If we've already determined we're not combinable, stop. + if (shouldCombine < 0) return; + + for (EvalSpec spec: g.getSpecs()) { + spec.visit(this); + } + } + + // MapLookupSpec is a NOP, as it neither will benefit from nor + // prevents combinability. + + // ProjectSpec is a NOP, as it neither will benefit from nor + // prevents combinability. + + @Override + public void visitSortDistinct(SortDistinctSpec sd) { + shouldCombine = -1; + } + + // StarSpec is a NOP, as it neither will benefit from nor + // prevents combinability. + } + }