pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jcove...@apache.org
Subject svn commit: r1356921 [3/4] - in /pig/trunk: ./ conf/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expr...
Date Tue, 03 Jul 2012 20:36:16 GMT
Added: pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java (added)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,1270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.File;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.net.URI;
+import java.util.List;
+import java.util.Locale;
+import java.util.Queue;
+
+import javax.tools.Diagnostic;
+import javax.tools.DiagnosticCollector;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This class encapsulates the generation of SchemaTuples, as well as some logic
+ * around shipping code to the distributed cache.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class SchemaTupleClassGenerator {
+    private static final Log LOG = LogFactory.getLog(SchemaTupleClassGenerator.class);
+
+    private SchemaTupleClassGenerator() {}
+
+    /**
+     * The GenContext mechanism provides a level of control in where SchemaTupleFactories
+     * are used. By attaching a GenContext enum type to the registration of a Schema,
+     * the code can express the intent of where a SchemaTupleFactory is intended to be used.
+     * In this way, if a load func and a join both involve Tuples of the same Schema, it's
+     * possible to use SchemaTupleFactories in one but not in the other.
+     */
+    public static enum GenContext {
+        /**
+         * This context is used in UDF code. Currently, this is only used for
+         * the inputs to UDF's.
+         */
+        UDF ("pig.schematuple.udf", true, GenerateUdf.class),
+        /**
+         * This context is for LoadFuncs. It is currently not used,
+         * however the intent is that when a Schema is known, the
+         * LoadFunc can return typed Tuples.
+         */
+        LOAD ("pig.schematuple.load", true, GenerateLoad.class),
+        /**
+         * This context controls whether or not SchemaTuples will be used in FR joins.
+         * Currently, they will be used in the HashMap that FR Joins construct.
+         */
+        FR_JOIN ("pig.schematuple.fr_join", true, GenerateFrJoin.class),
+        /**
+         * This context controls whether or not SchemaTuples will be used in merge joins.
+         */
+        MERGE_JOIN ("pig.schematuple.merge_join", true, GenerateMergeJoin.class),
+        /**
+         * All registered Schemas will also be registered in one additional context.
+         * This context will allow users to "force" the load of a SchemaTupleFactory
+         * if one is present in any context.
+         */
+        FORCE_LOAD ("pig.schematuple.force", true, GenerateForceLoad.class);
+
+        /**
+         * These annotations are used to mark a given SchemaTuple with
+         * the context in which is was intended to be generated.
+         */
+
+        @Retention(RetentionPolicy.RUNTIME)
+        @Target(ElementType.TYPE)
+        public @interface GenerateUdf {}
+
+        @Retention(RetentionPolicy.RUNTIME)
+        @Target(ElementType.TYPE)
+        public @interface GenerateLoad {}
+
+        @Retention(RetentionPolicy.RUNTIME)
+        @Target(ElementType.TYPE)
+        public @interface GenerateFrJoin {}
+
+        @Retention(RetentionPolicy.RUNTIME)
+        @Target(ElementType.TYPE)
+        public @interface GenerateMergeJoin {}
+
+        @Retention(RetentionPolicy.RUNTIME)
+        @Target(ElementType.TYPE)
+        public @interface GenerateForceLoad {}
+
+        private String key;
+        private boolean defaultValue;
+        private Class<?> annotation;
+
+        GenContext(String key, boolean defaultValue, Class<?> annotation) {
+            this.key = key;
+            this.defaultValue = defaultValue;
+            this.annotation = annotation;
+        }
+
+        public String key() {
+            return key;
+        }
+
+        public String getAnnotationCanonicalName() {
+            return annotation.getCanonicalName();
+        }
+
+        /**
+         * Checks the generated class to see if the annotation
+         * associated with this enum is present.
+         * @param clazz
+         * @return
+         */
+        @SuppressWarnings({ "unchecked", "rawtypes" })
+        public boolean shouldGenerate(Class clazz) {
+            return clazz.getAnnotation(annotation) != null;
+        }
+
+        /**
+         * Given a job configuration file, this checks to see
+         * if the default value has been overriden.
+         * @param conf
+         * @return
+         */
+        public boolean shouldGenerate(Configuration conf) {
+            String shouldString = conf.get(key);
+            if (shouldString == null) {
+                return defaultValue;
+            }
+            return Boolean.parseBoolean(shouldString);
+        }
+    }
+
+    /**
+     * This value is used to distinguish all of the generated code.
+     * The general naming scheme used is SchemaTupe_identifier. Note that
+     * identifiers are incremented before code is actually generated.
+     */
+    private static int nextGlobalClassIdentifier = 0;
+
+    /**
+     * This class actually generates the code for a given Schema.
+     * @param   schema
+     * @param   true or false depending on whether it should be appendable
+     * @param   identifier
+     * @param   a list of contexts in which the SchemaTuple is intended to be instantiated
+     */
+    protected static void generateSchemaTuple(Schema s, boolean appendable, int id, File codeDir, GenContext... contexts) {
+        StringBuilder contextAnnotations = new StringBuilder();
+        for (GenContext context : contexts) {
+            LOG.info("Including context: " + context);
+            contextAnnotations.append("@").append(context.getAnnotationCanonicalName()).append("\n");
+        }
+
+        String codeString = produceCodeString(s, appendable, id, contextAnnotations.toString(), codeDir);
+
+        String name = "SchemaTuple_" + id;
+
+        LOG.info("Compiling class " + name + " for Schema: " + s + ", and appendability: " + appendable);
+
+        compileCodeString(name, codeString, codeDir);
+    }
+
+    private static int generateSchemaTuple(Schema s, boolean appendable, File codeDir, GenContext... contexts) {
+        int id = SchemaTupleClassGenerator.getNextGlobalClassIdentifier();
+
+        generateSchemaTuple(s, appendable, id, codeDir, contexts);
+
+        return id;
+    }
+
+    /**
+     * This method generates the actual SchemaTuple for the given Schema.
+     * @param   schema
+     * @param   whether the class should be appendable
+     * @param   identifier
+     * @return  the generated class's implementation
+     */
+    private static String produceCodeString(Schema s, boolean appendable, int id, String contextAnnotations, File codeDir) {
+        TypeInFunctionStringOutFactory f = new TypeInFunctionStringOutFactory(s, id, appendable, contextAnnotations, codeDir);
+
+        for (Schema.FieldSchema fs : s.getFields()) {
+            f.process(fs);
+        }
+
+        return f.end();
+    }
+
+    protected static int getNextGlobalClassIdentifier() {
+        return nextGlobalClassIdentifier++;
+    }
+
+    /**
+     * This method takes generated code, and compiles it down to a class file. It will output
+     * the generated class file to the static temporary directory for generated code. Note
+     * that the compiler will use the classpath that Pig is instantiated with, as well as the
+     * generated directory.
+     *
+     * @param String of generated code
+     * @param name of class
+     */
+    //TODO in the future, we can use ASM to generate the bytecode directly.
+    private static void compileCodeString(String className, String generatedCodeString, File codeDir) {
+        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+        JavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
+        Iterable<? extends JavaFileObject> compilationUnits = Lists.newArrayList(new JavaSourceFromString(className, generatedCodeString));
+
+        DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
+
+        String tempDir = codeDir.getAbsolutePath();
+
+        String classPath = System.getProperty("java.class.path") + ":" + tempDir;
+        LOG.debug("Compiling SchemaTuple code with classpath: " + classPath);
+
+        List<String> optionList = Lists.newArrayList();
+        // Adds the current classpath to the compiler along with our generated code
+        optionList.add("-classpath");
+        optionList.add(classPath);
+        optionList.add("-d");
+        optionList.add(tempDir);
+
+        if (!compiler.getTask(null, fileManager, diagnostics, optionList, null, compilationUnits).call()) {
+            LOG.warn("Error compiling: " + className + ". Printing compilation errors and shutting down.");
+            for (Diagnostic<? extends JavaFileObject> diagnostic : diagnostics.getDiagnostics()) {
+                LOG.warn("Error on line " + diagnostic.getLineNumber() + ": " + diagnostic.getMessage(Locale.US));
+            }
+            throw new RuntimeException("Unable to compile code string:\n" + generatedCodeString);
+        }
+
+        LOG.info("Successfully compiled class: " + className);
+    }
+
+    /**
+     * This class allows code to be generated directly from a String, instead of having to be
+     * on disk.
+     */
+    private static class JavaSourceFromString extends SimpleJavaFileObject {
+        final String code;
+
+        JavaSourceFromString(String name, String code) {
+            super(URI.create("string:///" + name.replace('.','/') + Kind.SOURCE.extension), Kind.SOURCE);
+            this.code = code;
+        }
+
+        @Override
+        public CharSequence getCharContent(boolean ignoreEncodingErrors) {
+            return code;
+        }
+    }
+
+    static class CompareToSpecificString extends TypeInFunctionStringOut {
+        private int id;
+
+        public CompareToSpecificString(int id, boolean appendable) {
+            super(appendable);
+            this.id = id;
+        }
+
+        public void prepare() {
+            add("@Override");
+            add("protected int generatedCodeCompareToSpecific(SchemaTuple_"+id+" t) {");
+            add("    int i = 0;");
+        }
+
+        public void process(int fieldNum, Schema.FieldSchema fs) {
+            add("    i = compare(checkIfNull_" + fieldNum + "(), getPos_"
+                    + fieldNum + "(), t.checkIfNull_" + fieldNum + "(), t.getPos_"
+                    + fieldNum + "());");
+            add("    if (i != 0) {");
+            add("        return i;");
+            add("    }");
+        }
+
+        public void end() {
+            add("    return i;");
+            add("}");
+        }
+    }
+
+    //TODO clear up how it deals with nulls etc. IE is the logic correct
+    static class CompareToString extends TypeInFunctionStringOut {
+        private int id;
+
+        public CompareToString(int id) {
+            this.id = id;
+        }
+
+        public void prepare() {
+            add("@Override");
+            add("protected int generatedCodeCompareTo(SchemaTuple t, boolean checkType) {");
+            add("    int i;");
+        }
+
+        boolean compTup = false;
+        boolean compStr = false;
+        boolean compIsNull = false;
+        boolean compByte = false;
+
+        public void process(int fieldNum, Schema.FieldSchema fs) {
+            add("        i = compareWithElementAtPos(checkIfNull_" + fieldNum + "(), getPos_" + fieldNum + "(), t, " + fieldNum + ");");
+            add("        if (i != 0) {");
+            add("            return i;");
+            add("        }");
+        }
+
+        public void end() {
+            add("    return 0;");
+            add("}");
+        }
+    }
+
+    static class HashCode extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("public int generatedCodeHashCode() {");
+            add("    int h = 17;");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("    h = hashCodePiece(h, getPos_" + fieldPos + "(), checkIfNull_" + fieldPos + "());");
+        }
+
+        public void end() {
+            add("    return h;");
+            add("}");
+        }
+    }
+
+    static class FieldString extends TypeInFunctionStringOut {
+        private List<Queue<Integer>> listOfQueuesForIds;
+        private Schema schema;
+
+        private int primitives = 0;
+        private int isNulls = 0;
+
+        private int booleanBytes = 0;
+        private int booleans = 0;
+        private File codeDir;
+
+        public void prepare() {
+            String s = schema.toString();
+            s = s.substring(1, s.length() - 1);
+            s = Base64.encodeBase64URLSafeString(s.getBytes());
+            add("private static Schema schema = staticSchemaGen(\"" + s + "\");");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            if (!isTuple()) {
+                if (isPrimitive() && (primitives++ % 8 == 0)) {
+                    add("private byte isNull_"+ isNulls++ +" = (byte)0xFF;");
+                }
+
+                if (isBoolean()) {
+                    if (booleans++ % 8 == 0) {
+                        add("private byte booleanByte_"+ booleanBytes++ +";");
+                    }
+                } else {
+                    add("private "+typeName()+" pos_"+fieldPos+";");
+                }
+            } else {
+                int id = SchemaTupleClassGenerator.generateSchemaTuple(fs.schema, isAppendable(), codeDir());
+
+                for (Queue<Integer> q : listOfQueuesForIds) {
+                    q.add(id);
+                }
+
+                add("private SchemaTuple_"+id+" pos_"+fieldPos+";");
+            }
+        }
+
+        @Override
+        public void end() {
+            addBreak();
+            add("@Override");
+            add("public Schema getSchema() {");
+            add("    return schema;");
+            add("}");
+            addBreak();
+        }
+
+        public FieldString(File codeDir, List<Queue<Integer>> listOfQueuesForIds, Schema schema, boolean appendable) {
+            super(appendable);
+            this.codeDir = codeDir;
+            this.listOfQueuesForIds = listOfQueuesForIds;
+            this.schema = schema;
+        }
+
+        public File codeDir() {
+            return codeDir;
+        }
+    }
+
+    static class SetPosString extends TypeInFunctionStringOut {
+        private Queue<Integer> idQueue;
+
+        private int byteField = 0; //this is for setting booleans
+        private int byteIncr = 0; //this is for counting the booleans we've encountered
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            if (!isTuple()) {
+                add("public void setPos_"+fieldPos+"("+typeName()+" v) {");
+                if (isPrimitive()) {
+                    add("    setNull_"+fieldPos+"(false);");
+                }
+
+                if (!isBoolean()) {
+                    add("    pos_"+fieldPos+" = v;");
+                } else {
+                    add("    booleanByte_" + byteField + " = BytesHelper.setBitByPos(booleanByte_" + byteField + ", v, " + byteIncr++ + ");");
+
+                    if (byteIncr % 8 == 0) {
+                        byteIncr = 0;
+                        byteField++;
+                    }
+                }
+
+                add("}");
+            } else {
+                int nestedSchemaTupleId = idQueue.remove();
+                add("public void setPos_"+fieldPos+"(SchemaTuple_"+nestedSchemaTupleId+" t) {");
+                add("    pos_" + fieldPos + " = t;");
+                add("}");
+                addBreak();
+                add("public void setPos_"+fieldPos+"(SchemaTuple t) {");
+                add("    if (pos_"+fieldPos+" == null) {");
+                add("        pos_"+fieldPos+" = new SchemaTuple_"+nestedSchemaTupleId+"();");
+                add("    }");
+                add("    pos_" + fieldPos + ".setAndCatch(t);");
+                add("}");
+                addBreak();
+                add("public void setPos_"+fieldPos+"(Tuple t) {");
+                add("    if (pos_"+fieldPos+" == null) {");
+                add("        pos_"+fieldPos+" = new SchemaTuple_"+nestedSchemaTupleId+"();");
+                add("    }");
+                add("    pos_" + fieldPos + ".setAndCatch(t);");
+                add("}");
+            }
+            addBreak();
+        }
+
+        public SetPosString(Queue<Integer> idQueue) {
+            this.idQueue = idQueue;
+        }
+    }
+
+    static class ListSetString extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("public void generatedCodeSetIterator(Iterator<Object> it) throws ExecException {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("    setPos_"+fieldPos+"(unbox(it.next(), getDummy_"+fieldPos+"()));");
+        }
+
+        public void end() {
+            add("}");
+        }
+    }
+
+    static class GenericSetString extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("public void generatedCodeSetField(int fieldNum, Object val) throws ExecException {");
+            add("    switch (fieldNum) {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("    case ("+fieldPos+"):");
+            add("        if (val == null) {");
+            add("            setNull_" + fieldPos + "(true);");
+            add("            return;");
+            add("        }");
+            add("        setPos_"+fieldPos+"(unbox(val, getDummy_"+fieldPos+"()));");
+            add("        break;");
+        }
+
+        public void end() {
+            add("    default:");
+            add("        throw new ExecException(\"Invalid index given to set: \" + fieldNum);");
+            add("    }");
+            add("}");
+        }
+    }
+
+    static class GenericGetString extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("public Object generatedCodeGetField(int fieldNum) throws ExecException {");
+            add("    switch (fieldNum) {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("    case ("+fieldPos+"): return checkIfNull_"+fieldPos+"() ? null : box(getPos_"+fieldPos+"());");
+        }
+
+        public void end() {
+            add("    default: throw new ExecException(\"Invalid index given to get: \" + fieldNum);");
+            add("    }");
+            add("}");
+        }
+    }
+
+    static class GeneralIsNullString extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("public boolean isGeneratedCodeFieldNull(int fieldNum) throws ExecException {");
+            add("    switch (fieldNum) {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("    case ("+fieldPos+"): return checkIfNull_"+fieldPos+"();");
+        }
+
+        public void end() {
+            add("    default: throw new ExecException(\"Invalid index given: \" + fieldNum);");
+            add("    }");
+            add("}");
+        }
+    }
+
+    static class CheckIfNullString extends TypeInFunctionStringOut {
+        private int nullByte = 0; //the byte_ val
+        private int byteIncr = 0; //the mask we're on
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("public boolean checkIfNull_" + fieldPos + "() {");
+            if (isPrimitive()) {
+                add("    return BytesHelper.getBitByPos(isNull_" + nullByte + ", " + byteIncr++ +");");
+                if (byteIncr % 8 == 0) {
+                    byteIncr = 0;
+                    nullByte++;
+                }
+            } else {
+               add("    return pos_" + fieldPos + " == null;");
+            }
+            add("}");
+            addBreak();
+        }
+    }
+
+   static class SetNullString extends TypeInFunctionStringOut {
+        private int nullByte = 0; //the byte_ val
+        private int byteIncr = 0; //the mask we're on
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("public void setNull_"+fieldPos+"(boolean b) {");
+            if (isPrimitive()) {
+                add("    isNull_" + nullByte + " = BytesHelper.setBitByPos(isNull_" + nullByte + ", b, " + byteIncr++ + ");");
+                if (byteIncr % 8 == 0) {
+                    byteIncr = 0;
+                    nullByte++;
+                }
+            } else {
+                add("    if (b) {");
+                add("        pos_" + fieldPos + " = null;");
+                add("    }");
+            }
+            add("}");
+            addBreak();
+        }
+    }
+
+    //TODO should this do something different if t is null?
+    static class SetEqualToSchemaTupleSpecificString extends TypeInFunctionStringOut {
+        private int id;
+
+        public void prepare() {
+            add("@Override");
+            add("protected SchemaTuple generatedCodeSetSpecific(SchemaTuple_"+id+" t) {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("    if (t.checkIfNull_" + fieldPos + "()) {");
+            add("        setNull_" + fieldPos + "(true);");
+            add("    } else {");
+            add("        setPos_"+fieldPos+"(t.getPos_"+fieldPos+"());");
+            add("    }");
+            addBreak();
+        }
+
+        public void end() {
+            add("    return this;");
+            add("}");
+            addBreak();
+        }
+
+        public SetEqualToSchemaTupleSpecificString(int id) {
+            this.id = id;
+        }
+    }
+
+    //this has to write the null state of all the fields, not just the null bytes, though those
+    //will have to be reconstructed
+    static class WriteNullsString extends TypeInFunctionStringOut {
+        String s = "    boolean[] b = {\n";
+
+        public void prepare() {
+            add("@Override");
+            add("protected boolean[] generatedCodeNullsArray() throws IOException {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            s += "        checkIfNull_"+fieldPos+"(),\n";
+        }
+
+        public void end() {
+            s = s.substring(0, s.length() - 2) + "\n    };";
+            add(s);
+            add("    return b;");
+            add("}");
+            addBreak();
+        }
+
+        public WriteNullsString(boolean appendable) {
+            super(appendable);
+        }
+    }
+
+   static class ReadString extends TypeInFunctionStringOut {
+        private Queue<Integer> idQueue;
+
+        private int booleans = 0;
+
+        public void prepare() {
+            add("@Override");
+            add("protected void generatedCodeReadFields(DataInput in, boolean[] b) throws IOException {");
+        }
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            if (isBoolean()) {
+                booleans++;
+                add("    if (b["+fieldPos+"]) {");
+                add("        setNull_"+fieldPos+"(true);");
+                add("    } else {");
+                add("        setNull_"+fieldPos+"(false);");
+                add("    }");
+            } else if (!isTuple()) {
+                add("    if (b["+fieldPos+"]) {");
+                add("        setNull_"+fieldPos+"(true);");
+                add("    } else {");
+                add("        setPos_"+fieldPos+"(read(in, pos_"+fieldPos+"));");
+                add("    }");
+                addBreak();
+            } else {
+                int nestedSchemaTupleId = idQueue.remove();
+                add("    if (b["+fieldPos+"]) {");
+                add("        setNull_"+fieldPos+"(true);");
+                add("    } else {");
+                add("        SchemaTuple_"+nestedSchemaTupleId+" st = new SchemaTuple_"+nestedSchemaTupleId+"();");
+                add("        st.readFields(in);");
+                add("        setPos_"+fieldPos+"(st);");
+                add("    }");
+                addBreak();
+            }
+        }
+
+        public void end() {
+            if (booleans > 0) {
+                int i = 0;
+                while (booleans > 0) {
+                    add("    booleanByte_"+(i++)+" = in.readByte();");
+                    booleans -= 8;
+                }
+            }
+            add("}");
+            addBreak();
+        }
+
+        public ReadString(Queue<Integer> idQueue, boolean appendable) {
+            super(appendable);
+            this.idQueue = idQueue;
+        }
+    }
+
+
+    static class WriteString extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("protected void generatedCodeWriteElements(DataOutput out) throws IOException {");
+        }
+
+        private int booleans = 0;
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            if (isBoolean()) {
+                booleans++;
+            } else {
+                add("    if (!checkIfNull_"+fieldPos+"()) {");
+                add("        write(out, pos_"+fieldPos+");");
+                add("    }");
+                addBreak();
+            }
+        }
+
+        public void end() {
+            if (booleans > 0) {
+                int i = 0;
+                while (booleans > 0) {
+                    add("    out.writeByte(booleanByte_"+(i++)+");");
+                    booleans -= 8;
+                }
+            }
+            add("}");
+            addBreak();
+        }
+    }
+
+    //TODO need to include all of the objects from Schema (have it implement it's own getMemorySize()?
+    static class MemorySizeString extends TypeInFunctionStringOut {
+        private int size = 0;
+
+        String s = "    return SizeUtil.roundToEight(";
+
+        public void prepare() {
+            add("@Override");
+            add("public long getGeneratedCodeMemorySize() {");
+        }
+
+        private int booleans = 0;
+        private int primitives = 0;
+
+        //TODO a null array or object variable still takes up space for the pointer, yes?
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            if (isInt() || isFloat()) {
+                size += 4;
+            } else if (isLong() || isDouble()) {
+                size += 8;
+            } else if (isBytearray()) {
+                s += "(pos_"+fieldPos+" == null ? 8 : SizeUtil.roundToEight(12 + pos_"+fieldPos+".length) * 8) + ";
+            } else if (isString()) {
+                s += "(pos_"+fieldPos+" == null ? 8 : SizeUtil.getPigObjMemSize(pos_"+fieldPos+")) + ";
+            } else if (isBoolean()) {
+                if (booleans++ % 8 == 0) {
+                    size++; //accounts for the byte used to store boolean values
+                }
+            } else if (isBag()) {
+                //TODO IMPLEMENT
+            } else {
+                s += "(pos_"+fieldPos+" == null ? 8 : pos_"+fieldPos+".getMemorySize()) + ";
+            }
+
+            if (isPrimitive() && primitives++ % 8 == 0) {
+                size++; //accounts for the null byte
+            }
+        }
+
+        public void end() {
+            s += size + ");";
+            add(s);
+            add("}");
+            addBreak();
+        }
+    }
+
+    static class GetDummyString extends TypeInFunctionStringOut {
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            add("public "+typeName()+" getDummy_"+fieldPos+"() {");
+            switch (fs.type) {
+            case (DataType.INTEGER): add("    return 0;"); break;
+            case (DataType.LONG): add("    return 0L;"); break;
+            case (DataType.FLOAT): add("    return 0.0f;"); break;
+            case (DataType.DOUBLE): add("    return 0.0;"); break;
+            case (DataType.BOOLEAN): add("    return true;"); break;
+            case (DataType.BYTEARRAY): add("    return (byte[])null;"); break;
+            case (DataType.CHARARRAY): add("    return (String)null;"); break;
+            case (DataType.TUPLE): add("    return (Tuple)null;"); break;
+            case (DataType.BAG): add("    return (DataBag)null;"); break;
+            }
+            add("}");
+            addBreak();
+        }
+    }
+
+    static class GetPosString extends TypeInFunctionStringOut {
+        private Queue<Integer> idQueue;
+
+        private int booleanByte = 0;
+        private int booleans;
+
+        public void process(int fieldPos, Schema.FieldSchema fs) {
+            if (!isTuple()) {
+                add("public "+typeName()+" getPos_"+fieldPos+"() {");
+            } else {
+                int nestedSchemaTupleId = idQueue.remove();
+                add("public SchemaTuple_" + nestedSchemaTupleId + " getPos_"+fieldPos+"() {");
+            }
+            if (isBoolean()) {
+                add("    return BytesHelper.getBitByPos(booleanByte_" + booleanByte + ", " + booleans++ + ");");
+                if (booleans % 8 == 0) {
+                    booleanByte++;
+                    booleans = 0;
+                }
+            } else {
+                add("    return pos_"+fieldPos+";");
+            }
+            add("}");
+            addBreak();
+        }
+
+        public GetPosString(Queue<Integer> idQueue) {
+            this.idQueue = idQueue;
+        }
+    }
+
+    static class GetSchemaTupleIdentifierString extends TypeInFunctionStringOut {
+        private int id;
+
+        public void end() {
+            add("@Override");
+            add("public int getSchemaTupleIdentifier() {");
+            add("    return "+id+";");
+            add("}");
+            addBreak();
+        }
+
+        public GetSchemaTupleIdentifierString(int id) {
+            this.id = id;
+        }
+    }
+
+    static class SchemaSizeString extends TypeInFunctionStringOut {
+        int i = 0;
+
+        public void process(int fieldNum, Schema.FieldSchema fS) {
+            i++;
+        }
+
+        public void end() {
+            add("@Override");
+            add("protected int schemaSize() {");
+            add("    return " + i + ";");
+            add("}");
+            addBreak();
+        }
+    }
+
+    static class SizeString extends TypeInFunctionStringOut {
+        int i = 0;
+
+        public void process(int fieldNum, Schema.FieldSchema fS) {
+            i++;
+        }
+
+        public void end() {
+            add("@Override");
+            add("protected int generatedCodeSize() {");
+            add("    return " + i + ";");
+            add("}");
+            addBreak();
+        }
+
+        public SizeString(boolean appendable) {
+            super(appendable);
+        }
+    }
+
+    static class GetTypeString extends TypeInFunctionStringOut {
+        public void prepare() {
+            add("@Override");
+            add("public byte getGeneratedCodeFieldType(int fieldNum) throws ExecException {");
+            add("    switch (fieldNum) {");
+        }
+
+        public void process(int fieldNum, Schema.FieldSchema fs) {
+            add("    case ("+fieldNum+"): return "+fs.type+";");
+        }
+
+        public void end() {
+            add("    default: throw new ExecException(\"Invalid index given: \" + fieldNum);");
+            add("    }");
+            add("}");
+            addBreak();
+        }
+    }
+
+    static class SetEqualToSchemaTupleString extends TypeInFunctionStringOut {
+        int id;
+
+        public SetEqualToSchemaTupleString(int id) {
+            this.id = id;
+        }
+
+        public void prepare() {
+            add("@Override");
+            add("protected SchemaTuple generatedCodeSet(SchemaTuple t, boolean checkClass) throws ExecException {");
+            add("    if (checkClass && t instanceof SchemaTuple_"+id+") {");
+            add("        return setSpecific((SchemaTuple_"+id+")t);");
+            add("    }");
+            addBreak();
+            add("    if (t.size() < schemaSize()) {");
+            add("        throw new ExecException(\"Given SchemaTuple does not have as many fields as \"+getClass()+\" (\"+t.size()+\" vs \"+schemaSize()+\")\");");
+            add("    }");
+            addBreak();
+            add("    List<Schema.FieldSchema> theirFS = t.getSchema().getFields();");
+            addBreak();
+        }
+
+        public void process(int fieldNum, Schema.FieldSchema fs) {
+            add("    if ("+fs.type+" != theirFS.get("+fieldNum+").type) {");
+            add("        throw new ExecException(\"Given SchemaTuple does not match current in field " + fieldNum + ". Expected type: " + fs.type + ", found: \" + theirFS.get("+fieldNum+").type);");
+            add("    }");
+            add("    if (t.isNull("+fieldNum+")) {");
+            add("        setNull_"+fieldNum+"(true);");
+            add("    } else {");
+            if (!isTuple()) {
+                add("        setPos_"+fieldNum+"(t.get" + proper(fs.type) + "("+fieldNum+"));");
+            } else {
+                add("        setPos_"+fieldNum+"((Tuple)t.get("+fieldNum+"));");
+            }
+            add("    }");
+            addBreak();
+        }
+
+        public void end() {
+            add("    return this;");
+            add("}");
+        }
+    }
+
+   static class TypeAwareGetString extends TypeAwareSetString {
+        public TypeAwareGetString(byte type) {
+            super(type);
+        }
+
+        public void prepare() {
+            add("@Override");
+            add("protected "+name()+" generatedCodeGet"+properName()+"(int fieldNum) throws ExecException {");
+            add("    switch(fieldNum) {");
+        }
+
+        public void process(int fieldNum, Schema.FieldSchema fs) {
+            if (fs.type==thisType()) {
+                add("    case ("+fieldNum+"): return returnUnlessNull(checkIfNull_"+fieldNum+"(), getPos_"+fieldNum+"());");
+            }
+        }
+
+        public void end() {
+            add("    default:");
+            add("        return unbox"+properName()+"(getTypeAwareBase(fieldNum, \""+name()+"\"));");
+            add("    }");
+            add("}");
+        }
+    }
+
+    static class TypeAwareSetString extends TypeInFunctionStringOut {
+        private byte type;
+
+        public TypeAwareSetString(byte type) {
+            this.type = type;
+        }
+
+        public byte thisType() {
+            return type;
+        }
+
+        public String name() {
+            return typeName(type);
+        }
+
+        public String properName() {
+            return proper(thisType());
+        }
+
+        public void prepare() {
+            add("@Override");
+            add("protected void generatedCodeSet"+properName()+"(int fieldNum, "+name()+" val) throws ExecException {");
+            add("    switch(fieldNum) {");
+        }
+
+        public void process(int fieldNum, Schema.FieldSchema fs) {
+            if (fs.type==thisType())
+                add("    case ("+fieldNum+"): setPos_"+fieldNum+"(val); break;");
+        }
+
+        public void end() {
+            add("    default: setTypeAwareBase(fieldNum, val, \""+name()+"\");");
+            add("    }");
+            add("}");
+        }
+    }
+
+    //TODO need to use StringBuilder for all concatenation, not +
+    static class TypeInFunctionStringOutFactory {
+        private List<TypeInFunctionStringOut> listOfFutureMethods = Lists.newArrayList();
+        private int id;
+        private boolean appendable;
+        private String contextAnnotations;
+
+        public TypeInFunctionStringOutFactory(Schema s, int id, boolean appendable, String contextAnnotations, File codeDir) {
+            this.id = id;
+            this.appendable = appendable;
+            this.contextAnnotations = contextAnnotations;
+
+            Queue<Integer> nextNestedSchemaIdForSetPos = Lists.newLinkedList();
+            Queue<Integer> nextNestedSchemaIdForGetPos = Lists.newLinkedList();
+            Queue<Integer> nextNestedSchemaIdForReadField = Lists.newLinkedList();
+
+            List<Queue<Integer>> listOfQueuesForIds = Lists.newArrayList(nextNestedSchemaIdForSetPos, nextNestedSchemaIdForGetPos, nextNestedSchemaIdForReadField);
+
+            listOfFutureMethods.add(new FieldString(codeDir, listOfQueuesForIds, s, appendable)); //has to be run first
+            listOfFutureMethods.add(new SetPosString(nextNestedSchemaIdForSetPos));
+            listOfFutureMethods.add(new GetPosString(nextNestedSchemaIdForGetPos));
+            listOfFutureMethods.add(new GetDummyString());
+            listOfFutureMethods.add(new GenericSetString());
+            listOfFutureMethods.add(new GenericGetString());
+            listOfFutureMethods.add(new GeneralIsNullString());
+            listOfFutureMethods.add(new CheckIfNullString());
+            listOfFutureMethods.add(new SetNullString());
+            listOfFutureMethods.add(new SetEqualToSchemaTupleSpecificString(id));
+            listOfFutureMethods.add(new WriteNullsString(appendable));
+            listOfFutureMethods.add(new ReadString(nextNestedSchemaIdForReadField, appendable));
+            listOfFutureMethods.add(new WriteString());
+            listOfFutureMethods.add(new SizeString(appendable));
+            listOfFutureMethods.add(new MemorySizeString());
+            listOfFutureMethods.add(new GetSchemaTupleIdentifierString(id));
+            listOfFutureMethods.add(new HashCode());
+            listOfFutureMethods.add(new SchemaSizeString());
+            listOfFutureMethods.add(new GetTypeString());
+            listOfFutureMethods.add(new CompareToString(id));
+            listOfFutureMethods.add(new CompareToSpecificString(id, appendable));
+            listOfFutureMethods.add(new SetEqualToSchemaTupleString(id));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.INTEGER));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.LONG));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.FLOAT));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.DOUBLE));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.BYTEARRAY));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.CHARARRAY));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.BOOLEAN));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.TUPLE));
+            listOfFutureMethods.add(new TypeAwareSetString(DataType.BAG));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.INTEGER));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.LONG));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.FLOAT));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.DOUBLE));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.BYTEARRAY));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.CHARARRAY));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.BOOLEAN));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.TUPLE));
+            listOfFutureMethods.add(new TypeAwareGetString(DataType.BAG));
+            listOfFutureMethods.add(new ListSetString());
+
+            for (TypeInFunctionStringOut t : listOfFutureMethods) {
+                t.prepare();
+            }
+        }
+
+        public void process(Schema.FieldSchema fs) {
+            for (TypeInFunctionStringOut t : listOfFutureMethods)
+                t.prepareProcess(fs);
+        }
+
+        public String end() {
+            StringBuilder head =
+                new StringBuilder()
+                    .append("import java.util.List;\n")
+                    .append("import java.util.Iterator;\n")
+                    .append("import java.io.DataOutput;\n")
+                    .append("import java.io.DataInput;\n")
+                    .append("import java.io.IOException;\n")
+                    .append("\n")
+                    .append("import com.google.common.collect.Lists;\n")
+                    .append("\n")
+                    .append("import org.apache.pig.data.DataType;\n")
+                    .append("import org.apache.pig.data.DataBag;\n")
+                    .append("import org.apache.pig.data.Tuple;\n")
+                    .append("import org.apache.pig.data.SchemaTuple;\n")
+                    .append("import org.apache.pig.data.AppendableSchemaTuple;\n")
+                    .append("import org.apache.pig.data.utils.SedesHelper;\n")
+                    .append("import org.apache.pig.data.utils.BytesHelper;\n")
+                    .append("import org.apache.pig.data.DataByteArray;\n")
+                    .append("import org.apache.pig.data.BinInterSedes;\n")
+                    .append("import org.apache.pig.impl.util.Utils;\n")
+                    .append("import org.apache.pig.impl.logicalLayer.schema.Schema;\n")
+                    .append("import org.apache.pig.impl.logicalLayer.FrontendException;\n")
+                    .append("import org.apache.pig.backend.executionengine.ExecException;\n")
+                    .append("import org.apache.pig.data.SizeUtil;\n")
+                    .append("import org.apache.pig.data.SchemaTuple.SchemaTupleQuickGenerator;\n")
+                    .append("\n")
+                    .append(contextAnnotations);
+
+            if (appendable) {
+                head.append("public class SchemaTuple_"+id+" extends AppendableSchemaTuple<SchemaTuple_"+id+"> {\n");
+            } else {
+                head.append("public class SchemaTuple_"+id+" extends SchemaTuple<SchemaTuple_"+id+"> {\n");
+            }
+
+            for (TypeInFunctionStringOut t : listOfFutureMethods) {
+                t.end();
+                head.append(t.getContent());
+            }
+
+            head.append("\n")
+                .append("    @Override\n")
+                .append("    public SchemaTupleQuickGenerator<SchemaTuple_" + id + "> getQuickGenerator() {\n")
+                .append("        return new SchemaTupleQuickGenerator<SchemaTuple_" + id + ">() {\n")
+                .append("            @Override\n")
+                .append("            public SchemaTuple_" + id + " make() {\n")
+                .append("                return new SchemaTuple_" + id + "();\n")
+                .append("            }\n")
+                .append("        };\n")
+                .append("    }\n");
+
+            return head.append("}").toString();
+        }
+    }
+
+    static class TypeInFunctionStringOut {
+        private int fieldPos = 0;
+        private StringBuilder content = new StringBuilder();
+        private byte type;
+
+        public void prepare() {}
+        public void process(int fieldPos, Schema.FieldSchema fs) {}
+        public void end() {}
+
+        public int appendable = -1;
+
+        public StringBuilder getContent() {
+            return content;
+        }
+
+        public TypeInFunctionStringOut() {
+            add("// this code generated by " + getClass());
+            addBreak();
+        }
+
+        public boolean isAppendable() {
+            if (appendable == -1) {
+                throw new RuntimeException("Need to be given appendable status in " + getClass());
+            }
+            return appendable == 1;
+        }
+
+        public TypeInFunctionStringOut(boolean appendable) {
+            this();
+            this.appendable = appendable ? 1 : 0;
+        }
+
+        public StringBuilder spaces(int indent) {
+            StringBuilder out = new StringBuilder();
+            String space = "    ";
+            for (int i = 0; i < indent; i++) {
+                out.append(space);
+            }
+            return out;
+        }
+
+        public void add(String s) {
+            for (String str : s.split("\\n")) {
+                content.append(spaces(1).append(str).append("\n"));
+            }
+        }
+
+        public void addBreak() {
+            content.append("\n");
+        }
+
+        public void prepareProcess(Schema.FieldSchema fs) {
+            type = fs.type;
+
+            if (type==DataType.MAP) {
+                throw new RuntimeException("Map currently not supported by SchemaTuple");
+            }
+
+            process(fieldPos, fs);
+            fieldPos++;
+        }
+
+        public boolean isInt() {
+            return type == DataType.INTEGER;
+        }
+
+        public boolean isLong() {
+            return type == DataType.LONG;
+        }
+
+        public boolean isFloat() {
+            return type == DataType.FLOAT;
+        }
+
+        public boolean isDouble() {
+            return type == DataType.DOUBLE;
+        }
+
+        public boolean isPrimitive() {
+            return isInt() || isLong() || isFloat() || isDouble() || isBoolean();
+        }
+
+        public boolean isBoolean() {
+            return type == DataType.BOOLEAN;
+        }
+
+        public boolean isString() {
+            return type == DataType.CHARARRAY;
+        }
+
+        public boolean isBytearray() {
+            return type == DataType.BYTEARRAY;
+        }
+
+        public boolean isTuple() {
+            return type == DataType.TUPLE;
+        }
+
+        public boolean isBag() {
+            return type == DataType.BAG;
+        }
+
+        public boolean isObject() {
+            return !isPrimitive();
+        }
+
+        public String typeName() {
+            return typeName(type);
+        }
+
+        public String typeName(byte type) {
+            switch(type) {
+                case (DataType.INTEGER): return "int";
+                case (DataType.LONG): return "long";
+                case (DataType.FLOAT): return "float";
+                case (DataType.DOUBLE): return "double";
+                case (DataType.BYTEARRAY): return "byte[]";
+                case (DataType.CHARARRAY): return "String";
+                case (DataType.BOOLEAN): return "boolean";
+                case (DataType.TUPLE): return "Tuple";
+                case (DataType.BAG): return "DataBag";
+                default: throw new RuntimeException("Can't return String for given type " + DataType.findTypeName(type));
+            }
+        }
+
+        public String proper(byte type) {
+            String s = typeName(type);
+            switch (type) {
+            case DataType.BYTEARRAY: return "Bytes";
+            default: return s.substring(0,1).toUpperCase() + s.substring(1);
+            }
+        }
+    }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java (added)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.SchemaTuple.SchemaTupleQuickGenerator;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * This is an implementation of TupleFactory that will instantiate
+ * SchemaTuple's. This class has nothing to do with the actual generation
+ * of code, and instead simply encapsulates the classes which allow
+ * for efficiently creating SchemaTuples.
+ */
+public class SchemaTupleFactory implements TupleMaker<SchemaTuple<?>> {
+    static final Log LOG = LogFactory.getLog(SchemaTupleFactory.class);
+
+    private SchemaTupleQuickGenerator<? extends SchemaTuple<?>> generator;
+    private Class<SchemaTuple<?>> clazz;
+    private int tupleSize;
+
+    protected SchemaTupleFactory(Class<SchemaTuple<?>> clazz,
+            SchemaTupleQuickGenerator<? extends SchemaTuple<?>> generator) {
+        this.clazz = clazz;
+        this.generator = generator;
+        tupleSize = generator.make().size();
+    }
+
+    /**
+     * This method inspects a Schema to see whether or
+     * not a SchemaTuple implementation can be generated
+     * for the types present. Currently, bags and maps
+     * are not supported.
+     * @param   schema
+     * @return  true if it is generatable
+     */
+    public static boolean isGeneratable(Schema s) {
+        if (s == null) {
+            return false;
+        }
+
+        for (Schema.FieldSchema fs : s.getFields()) {
+            if (fs.type == DataType.MAP) {
+                return false;
+            }
+
+            if (fs.type == DataType.TUPLE && !isGeneratable(fs.schema)) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public SchemaTuple<?> newTuple() {
+        return generator.make();
+    }
+
+    @Override
+    public SchemaTuple<?> newTuple(int size) {
+        if (size != tupleSize) {
+            throw new RuntimeException("Request a SchemaTuple of the wrong size! Requested ["
+                    + size + "], can only be [" + tupleSize + "]" );
+        }
+        return generator.make();
+    }
+
+    public Class<SchemaTuple<?>> tupleClass() {
+        return clazz;
+    }
+
+    // We could make this faster by caching the result, but I doubt it will be called
+    // in any great volume.
+    public boolean isFixedSize() {
+        return clazz.isAssignableFrom(AppendableSchemaTuple.class);
+    }
+
+    /**
+     * This method is the publicly facing method which returns a SchemaTupleFactory
+     * which will generate the SchemaTuple associated with the given identifier. This method
+     * is primarily for internal use in cases where the problem SchemaTuple is known
+     * based on the identifier associated with it (such as when deserializing).
+     * @param   identifier
+     * @return  a SchemaTupleFactory which will return SchemaTuple's of the given identifier
+     */
+    protected static SchemaTupleFactory getInstance(int id) {
+        return SchemaTupleBackend.newSchemaTupleFactory(id);
+    }
+
+    /**
+     * This method is the publicly facing method which returns a SchemaTupleFactory
+     * which will generate SchemaTuples of the given Schema. Note that this method
+     * returns null if such a given SchemaTupleFactory does not exist, instead of
+     * throwing an error. The GenContext is used to specify the context in which we
+     * are requesting a SchemaTupleFactory. This is necessary so that the use
+     * of SchemaTuple can be controlled -- it is possible that someone wants a
+     * factory that generates code in the context of joins, but wants to disable such
+     * use for udfs.
+     * @param   schema          the Schema generated
+     * @param   isAppendable    whether or not the SchemaTuple should be appendable
+     * @param   context         the context in which we want a SchemaTupleFactory
+     * @return  a SchemaTupleFactory which will return SchemaTuple's of the desired Schema
+     */
+    public static SchemaTupleFactory getInstance(Schema s, boolean isAppendable, GenContext context) {
+        return SchemaTupleBackend.newSchemaTupleFactory(s, isAppendable, context);
+    }
+
+    public static SchemaTupleFactory getInstance(Schema s, boolean isAppendable) {
+        return getInstance(s, isAppendable, GenContext.FORCE_LOAD);
+    }
+
+    public static SchemaTupleFactory getInstance(Schema s) {
+        return getInstance(s, false);
+    }
+}

Added: pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java (added)
+++ pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.ExecType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.utils.StructuresHelper.Pair;
+import org.apache.pig.data.utils.StructuresHelper.SchemaKey;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+
+/**
+ * This class is to be used at job creation time. It provides the API that lets code
+ * register Schemas with pig to be generated. It is necessary to register these Schemas
+ * so that the generated code can be made on the client side, and shipped to the mappers
+ * and reducers.
+ */
+public class SchemaTupleFrontend {
+    private static final Log LOG = LogFactory.getLog(SchemaTupleFrontend.class);
+
+    private static SchemaTupleFrontend stf;
+
+    /**
+     * Schemas registered for generation are held here.
+     */
+    private static Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate = Maps.newHashMap();
+
+    private int internalRegisterToGenerateIfPossible(Schema udfSchema, boolean isAppendable, GenContext type) {
+        Pair<SchemaKey, Boolean> key = Pair.make(new SchemaKey(udfSchema), isAppendable);
+        Pair<Integer, Set<GenContext>> pr = schemasToGenerate.get(key);
+        if (pr != null) {
+            pr.getSecond().add(type);
+            return pr.getFirst();
+        }
+        if (!SchemaTupleFactory.isGeneratable(udfSchema)) {
+            LOG.warn("Given Schema is not generatable: " + udfSchema);
+            return -1;
+        }
+        int id = SchemaTupleClassGenerator.getNextGlobalClassIdentifier();
+        Set<GenContext> contexts = Sets.newHashSet();
+        contexts.add(GenContext.FORCE_LOAD);
+        contexts.add(type);
+        schemasToGenerate.put(key, Pair.make(Integer.valueOf(id), contexts));
+        LOG.debug("Registering "+(isAppendable ? "Appendable" : "")+"Schema for generation ["
+                + udfSchema + "] with id [" + id + "] and context: " + type);
+        return id;
+    }
+
+    private Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> getSchemasToGenerate() {
+        return schemasToGenerate;
+    }
+
+    private static class SchemaTupleFrontendGenHelper {
+        private File codeDir;
+        private PigContext pigContext;
+        private Configuration conf;
+
+        public SchemaTupleFrontendGenHelper(PigContext pigContext, Configuration conf) {
+            codeDir = Files.createTempDir();
+            codeDir.deleteOnExit();
+            LOG.debug("Temporary directory for generated code created: "
+                    + codeDir.getAbsolutePath());
+            this.pigContext = pigContext;
+            this.conf = conf;
+        }
+
+        /**
+         * This method copies all class files present in the local temp directory to the distributed cache.
+         * All copied files will have a symlink of their name. No files will be copied if the current
+         * job is being run from local mode.
+         * @param pigContext
+         * @param conf
+         */
+        private void internalCopyAllGeneratedToDistributedCache() {
+            LOG.info("Starting process to move generated code to distributed cacche");
+            if (pigContext.getExecType() == ExecType.LOCAL) {
+                String codePath = codeDir.getAbsolutePath();
+                LOG.info("Distributed cache not supported or needed in local mode. Setting key ["
+                        + LOCAL_CODE_DIR + "] with code temp directory: " + codePath);
+                if (pigContext.getExecType() == ExecType.LOCAL) {
+                    conf.set(LOCAL_CODE_DIR, codePath);
+                }
+                return;
+            }
+            DistributedCache.createSymlink(conf); // we will read using symlinks
+            StringBuilder serialized = new StringBuilder();
+            boolean first = true;
+            // We attempt to copy over every file in the generated code temp directory
+            for (File f : codeDir.listFiles()) {
+                if (first) {
+                    first = false;
+                } else {
+                    serialized.append(",");
+                }
+                String symlink = f.getName(); //the class name will also be the symlink
+                serialized.append(symlink);
+                Path src = new Path(f.toURI());
+                Path dst;
+                try {
+                    dst = FileLocalizer.getTemporaryPath(pigContext);
+                } catch (IOException e) {
+                    throw new RuntimeException("Error getting temporary path in HDFS", e);
+                }
+                FileSystem fs;
+                try {
+                    fs = dst.getFileSystem(conf);
+                } catch (IOException e) {
+                    throw new RuntimeException("Unable to get FileSystem", e);
+                }
+                try {
+                    fs.copyFromLocalFile(src, dst);
+                } catch (IOException e) {
+                    throw new RuntimeException("Unable to copy from local filesystem to HDFS, src = "
+                            + src + ", dst = " + dst, e);
+                }
+
+                String destination = dst.toString() + "#" + symlink;
+
+                try {
+                    DistributedCache.addCacheFile(new URI(destination), conf);
+                } catch (URISyntaxException e) {
+                    throw new RuntimeException("Unable to add file to distributed cache: " + destination, e);
+                }
+                LOG.info("File successfully added to the distributed cache: " + symlink);
+            }
+            String toSer = serialized.toString();
+            LOG.info("Setting key [" + SchemaTupleBackend.GENERATED_CLASSES_KEY + "] with classes to deserialize [" + toSer + "]");
+            // we must set a key in the job conf so individual jobs know to resolve the shipped classes
+            conf.set(SchemaTupleBackend.GENERATED_CLASSES_KEY, toSer);
+        }
+
+        /**
+         * This sets into motion the generation of all "registered" Schemas. All code will be generated
+         * into the temporary directory.
+         * @return true of false depending on if there are any files to copy to the distributed cache
+         */
+        private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) {
+            boolean filesToShip = false;
+            String shouldString = conf.get(SchemaTupleBackend.SHOULD_GENERATE_KEY);
+            if (shouldString == null || !Boolean.parseBoolean(shouldString)) {
+                LOG.info("Key ["+SchemaTupleBackend.SHOULD_GENERATE_KEY+"] is false, aborting generation.");
+                return false;
+            }
+            LOG.info("Generating all registered Schemas.");
+            for (Map.Entry<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> entry : schemasToGenerate.entrySet()) {
+                Pair<SchemaKey, Boolean> keyPair = entry.getKey();
+                Schema s = keyPair.getFirst().get();
+                Pair<Integer, Set<GenContext>> valuePair = entry.getValue();
+                Set<GenContext> contextsToInclude = Sets.newHashSet();
+                boolean isShipping = false;
+                for (GenContext context : valuePair.getSecond()) {
+                    if (!context.shouldGenerate(conf)) {
+                        LOG.info("Skipping generation of Schema [" + s + "], as key value [" + context.key() + "] was false.");
+                    } else {
+                        isShipping = true;
+                        contextsToInclude.add(context);
+                    }
+                }
+                if (!isShipping) {
+                    continue;
+                }
+                int id = valuePair.getFirst();
+                boolean isAppendable = keyPair.getSecond();
+                SchemaTupleClassGenerator.generateSchemaTuple(s, isAppendable, id, codeDir, contextsToInclude.toArray(new GenContext[0]));
+                filesToShip = true;
+            }
+            return filesToShip;
+        }
+    }
+
+    /**
+     * This allows the frontend/backend process to be repeated if on the same
+     * JVM (as in testing).
+     */
+    public static void reset() {
+        stf = null;
+    }
+
+    /**
+     * This method "registers" a Schema to be generated. It allows a portions of the code
+     * to register a Schema for generation without knowing whether code generation is enabled.
+     * A unique ID will be passed back that can be used internally to refer to generated SchemaTuples
+     * (such as in the case of serialization and deserialization). The context is necessary to allow
+     * the client to restrict where generated code can be used.
+     * @param   udfSchema       This is the Schema of a Tuple that we will potentially generate
+     * @param   isAppendable    This specifies whether or not we want the SchemaTuple to be appendable
+     * @param   context         This is the context in which users should be able to access the SchemaTuple
+     * @return  identifier
+     */
+    public static int registerToGenerateIfPossible(Schema udfSchema, boolean isAppendable, GenContext context) {
+        if (stf == null) {
+            stf = new SchemaTupleFrontend();
+        }
+
+        if (udfSchema == null) {
+            return -1;
+        }
+
+        try {
+            udfSchema = udfSchema.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Unable to clone Schema: " + udfSchema, e);
+        }
+        stripAliases(udfSchema);
+
+        return stf.internalRegisterToGenerateIfPossible(udfSchema, isAppendable, context);
+    }
+
+    private static void stripAliases(Schema s) {
+        for (Schema.FieldSchema fs : s.getFields()) {
+            fs.alias = null;
+            if (fs.schema != null) {
+                stripAliases(fs.schema);
+            }
+        }
+    }
+
+    /**
+     * This key is used when a job is run in local mode to pass the location of the generated code
+     * from the frontent to the "backend."
+     */
+    protected static final String LOCAL_CODE_DIR = "pig.schematuple.local.dir";
+
+    /**
+     * This must be called when the code has been generated and the generated code needs to be shipped
+     * to the cluster, so that it may be used by the mappers and reducers.
+     * @param pigContext
+     * @param conf
+     */
+    public static void copyAllGeneratedToDistributedCache(PigContext pigContext, Configuration conf) {
+        if (stf == null) {
+            LOG.debug("Nothing registered to generate.");
+            return;
+        }
+        SchemaTupleFrontendGenHelper stfgh = new SchemaTupleFrontendGenHelper(pigContext, conf);
+        stfgh.generateAll(stf.getSchemasToGenerate());
+        stfgh.internalCopyAllGeneratedToDistributedCache();
+    }
+}

Modified: pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/TupleFactory.java Tue Jul  3 20:36:09 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.data;
 
-import java.lang.Class;
-import java.lang.ClassLoader;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.List;
@@ -26,8 +24,6 @@ import java.util.List;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleDefaultRawComparator;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 /**
  * A factory to construct tuples.  This class is abstract so that users can
@@ -41,7 +37,7 @@ import org.apache.pig.impl.logicalLayer.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public abstract class TupleFactory {
+public abstract class TupleFactory implements TupleMaker<Tuple> {
     private static TupleFactory gSelf = null;
 
     /**
@@ -144,62 +140,6 @@ public abstract class TupleFactory {
     public abstract Tuple newTuple(Object datum);
 
     /**
-     * Create a tuple optimized for a provided schema.
-     * <p>
-     * Note: chances are {@link TupleFactory#newTupleForSchema(byte...)} is slightly
-     * more efficient in most implementations.
-     *
-     * @param schema  Pig Schema of the tuple we want to create.
-     * @return A tuple optimized for the schema
-     */
-    public Tuple newTupleForSchema(Schema schema) {
-        List<FieldSchema> fieldSchemas = schema.getFields();
-        byte[] types = new byte[fieldSchemas.size()];
-        for (int i = 0; i < fieldSchemas.size(); i++) {
-            FieldSchema fs = fieldSchemas.get(i);
-            types[i] = fs.type;
-        }
-        return newTupleForSchema(types);
-    }
-
-    /**
-     * Create a tuple optimized for a provided schema.
-     *
-     * @param dataTypes Schema of the desired Tuple, represented as bytes from {@link DataType}
-     * @return A tuple optimized for the schema.
-     */
-    public Tuple newTupleForSchema(byte... dataTypes) {
-        if (dataTypes == null || dataTypes.length == 0) {
-            return this.newTuple();
-        } else if (dataTypes.length == 1 && DataType.isAtomic(dataTypes[0])) {
-            switch (dataTypes[0]) {
-            case DataType.INTEGER:
-                return new PIntTuple();
-            case DataType.FLOAT:
-                return new PFloatTuple();
-            case DataType.LONG:
-                return new PLongTuple();
-            case DataType.DOUBLE:
-                return new PDoubleTuple();
-            case DataType.CHARARRAY:
-                return new PStringTuple();
-            case DataType.BOOLEAN:
-                return new PBooleanTuple();
-            default:
-                return this.newTuple(1);
-            }
-        } else if (dataTypes.length > 1) {
-            boolean allNumbers = true;
-            for (byte type : dataTypes) {
-                allNumbers &= DataType.isNumberType(type);
-            }
-            return allNumbers ? new PrimitiveTuple(dataTypes) : this.newTuple(dataTypes.length);
-        } else {
-            return this.newTuple(dataTypes.length);
-        }
-    }
-
-    /**
      * Return the actual class representing a tuple that the implementing
      * factory will be returning.  This is needed because Hadoop needs
      * to know the exact class we will be using for input and output.
@@ -230,5 +170,13 @@ public abstract class TupleFactory {
         return PigTupleDefaultRawComparator.class;
     }
 
+    /**
+     * This method is used to inspect whether the Tuples created by this factory
+     * will be of a fixed size when they are created. In practical terms, this means
+     * whether they support append or not.
+     * @return where the Tuple is fixed or not
+     */
+    public abstract boolean isFixedSize();
+
 }
 

Added: pig/trunk/src/org/apache/pig/data/TupleMaker.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/TupleMaker.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/TupleMaker.java (added)
+++ pig/trunk/src/org/apache/pig/data/TupleMaker.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+public interface TupleMaker<A extends Tuple> {
+    public A newTuple();
+    public A newTuple(int size);
+}

Modified: pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java (original)
+++ pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java Tue Jul  3 20:36:09 2012
@@ -1,7 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.pig.data;
 
+import java.util.Map;
+
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 public interface TypeAwareTuple extends Tuple {
 
@@ -11,13 +30,22 @@ public interface TypeAwareTuple extends 
     public void setLong(int idx, long val) throws ExecException;
     public void setString(int idx, String val) throws ExecException;
     public void setBoolean(int idx, boolean val) throws ExecException;
+    public void setBytes(int idx, byte[] val) throws ExecException;
+    public void setTuple(int idx, Tuple val) throws ExecException;
+    public void setDataBag(int idx, DataBag val) throws ExecException;
+    public void setMap(int idx, Map<String,Object> val) throws ExecException;
+
+    public int getInt(int idx) throws ExecException, FieldIsNullException;
+    public float getFloat(int idx) throws ExecException, FieldIsNullException;
+    public double getDouble(int idx) throws ExecException, FieldIsNullException;
+    public long getLong(int idx) throws ExecException, FieldIsNullException;
+    public String getString(int idx) throws ExecException, FieldIsNullException;
+    public boolean getBoolean(int idx) throws ExecException, FieldIsNullException;
+    public byte[] getBytes(int idx) throws ExecException, FieldIsNullException;
+    public Tuple getTuple(int idx) throws ExecException;
+    public DataBag getDataBag(int idx) throws ExecException, FieldIsNullException;
+    public Map<String,Object> getMap(int idx) throws ExecException, FieldIsNullException;
 
-    public Integer getInteger(int idx) throws ExecException;
-    public Float getFloat(int idx) throws ExecException;
-    public Double getDouble(int idx) throws ExecException;
-    public Long getLong(int idx) throws ExecException;
-    public String getString(int idx) throws ExecException;
-    public Boolean getBoolean(int idx) throws ExecException;
-
+    public Schema getSchema();
 
 }

Added: pig/trunk/src/org/apache/pig/data/utils/BytesHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/utils/BytesHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/utils/BytesHelper.java (added)
+++ pig/trunk/src/org/apache/pig/data/utils/BytesHelper.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+public class BytesHelper {
+    private static final int[] mask = {0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80};
+    private static final int[] invMask = {0xFE, 0xFD, 0xFB, 0xF7, 0xEF, 0xDF, 0xBF, 0x7F};
+
+    public static boolean getBitByPos(byte byt, int pos) {
+        return (byt & mask[pos]) > 0;
+    }
+
+    public static byte setBitByPos(byte byt, boolean bool, int pos) {
+        if (bool) {
+            return (byte)((int)byt | mask[pos]);
+        } else {
+            return (byte)((int)byt & invMask[pos]);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/data/utils/MethodHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/utils/MethodHelper.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/utils/MethodHelper.java (added)
+++ pig/trunk/src/org/apache/pig/data/utils/MethodHelper.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Method;
+
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MethodHelper {
+    private MethodHelper() {
+    }
+
+    /**
+     * This is an annotation which allows a class to signal that while it is "implementing"
+     * a method because it is specified by a parent class or interface, that the implementation
+     * just throws an exception, because it is not implemented.
+     */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.METHOD)
+    public @interface NotImplemented {}
+
+    /**
+     * Given a method and a class, this will return true if the method is declared in the class,
+     * and if it is, if the NotImplemented annotation is present. This method will recurse through
+     * the parent class hierarchy until it finds the first instance of the method at hand, and then it will
+     * return accordingly.
+     */
+    public static boolean isNotImplementedAnnotationPresent(Method m, Class<?> clazz) {
+        if (clazz.equals(Object.class)) {
+            return false;
+        }
+        for (Method clazzMethod : clazz.getDeclaredMethods()) {
+            if (MethodHelper.methodSignatureEqual(m, clazzMethod)) {
+                return clazzMethod.getAnnotation(NotImplemented.class) != null;
+            }
+        }
+        return isNotImplementedAnnotationPresent(m, clazz.getSuperclass());
+    }
+
+    public static RuntimeException methodNotImplemented() {
+        StackTraceElement[] ste = Thread.currentThread().getStackTrace();
+        StackTraceElement pre = ste[ste.length - 2];
+        return new UnsupportedOperationException(pre.getMethodName() + " not implemented in " + pre.getClassName());
+    }
+
+    /**
+     * This implements a stripped down version of method equality.
+     * method.equals(method) checks to see whether the declaring classes
+     * are equal, which we do not want. Instead, we just want to know
+     * if the methods are equal assuming that they come from the same
+     * class hierarchy (ie generated code which extends SchemaTuple).
+     */
+    public static boolean methodSignatureEqual(Method m1, Method m2) {
+        if (!m1.getName().equals(m2.getName())) {
+            return false;
+        }
+    
+        if (!m1.getReturnType().equals(m2.getReturnType())) {
+            return false;
+        }
+    
+        /* Avoid unnecessary cloning */
+        Class<?>[] params1 = m1.getParameterTypes();
+        Class<?>[] params2 = m2.getParameterTypes();
+        if (params1.length == params2.length) {
+            for (int i = 0; i < params1.length; i++) {
+                if (!params1[i].equals(params2[i])) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+}



Mime
View raw message