pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1784237 [12/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Date Fri, 24 Feb 2017 08:19:46 GMT
Added: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor {
+    public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    private static LogicalSchema replaceNullByteArraySchema(
+                         LogicalSchema originalSchema,
+                         LogicalSchema userSchema) throws FrontendException {
+        if( originalSchema == null && userSchema == null ) {
+            return null;
+        } else if ( originalSchema == null ) {
+            return userSchema.deepCopy();
+        } else if ( userSchema == null ) {
+            return originalSchema.deepCopy();
+        }
+
+        LogicalSchema replacedSchema = new LogicalSchema();
+        for (int i=0;i<originalSchema.size();i++) {
+            LogicalFieldSchema replacedFS = replaceNullByteArrayFieldSchema(originalSchema.getField(i), userSchema.getField(i));
+            replacedSchema.addField(replacedFS);
+        }
+        return replacedSchema;
+    }
+
+    private static LogicalFieldSchema replaceNullByteArrayFieldSchema(
+                         LogicalFieldSchema originalFS,
+                         LogicalFieldSchema userFS) throws FrontendException {
+        if( originalFS == null && userFS == null ) {
+            return null;
+        } else if ( originalFS == null ) {
+            return userFS.deepCopy();
+        } else if ( userFS == null ) {
+            return originalFS.deepCopy();
+        }
+        if ( originalFS.type==DataType.NULL
+            || originalFS.type==DataType.BYTEARRAY ) {
+            return userFS.deepCopy();
+        } else if ( userFS.type==DataType.NULL
+            || userFS.type==DataType.BYTEARRAY ) {
+            // Use originalFS schema but keep the alias from userFS
+            return new LogicalFieldSchema(userFS.alias, originalFS.schema,  originalFS.type);
+        }
+
+        if ( !DataType.isSchemaType(originalFS.type) ) {
+            return userFS.deepCopy();
+        } else {
+            LogicalSchema replacedSchema = replaceNullByteArraySchema(originalFS.schema, userFS.schema);
+            return new LogicalFieldSchema(userFS.alias, replacedSchema, userFS.type);
+        }
+    }
+
+    private static boolean hasOnlyNullOrByteArraySchema (LogicalFieldSchema fs) {
+        if( DataType.isSchemaType(fs.type) ) {
+            if( fs.schema != null ) {
+                for (LogicalFieldSchema sub_fs : fs.schema.getFields() ) {
+                    if( !hasOnlyNullOrByteArraySchema(sub_fs)  ) {
+                        return false;
+                    }
+                }
+            }
+        } else if( fs.type != DataType.NULL && fs.type != DataType.BYTEARRAY )  {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void visit(LOForEach foreach) throws FrontendException {
+        LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+        List<LogicalSchema> mExpSchemas = generate.getExpSchemas();
+        List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema();
+
+        // Skip if no way to figure out schema (usually both expression schema and
+        // user defined schema are null)
+        if (foreach.getSchema()==null) {
+            return;
+        }
+
+        if (mUserDefinedSchemas==null) {
+            return;
+        }
+
+        boolean hasUserDefinedSchema = false;
+        for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) {
+            if (mUserDefinedSchema!=null) {
+                hasUserDefinedSchema = true;
+                break;
+            }
+        }
+
+        if (!hasUserDefinedSchema) {
+            return;
+        }
+
+        if (mExpSchemas.size()!=mUserDefinedSchemas.size()) {
+            throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() +
+                    " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas",
+                    0, generate.getLocation());
+        }
+
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOForEach casterForEach = new LOForEach(plan);
+        casterForEach.setInnerPlan(innerPlan);
+        casterForEach.setAlias(foreach.getAlias());
+
+        List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+        LOGenerate gen = new LOGenerate(innerPlan, exps, null);
+        innerPlan.add(gen);
+
+        int index = 0;
+        boolean needCast = false;
+        for(int i=0;i<mExpSchemas.size();i++) {
+            LogicalSchema mExpSchema = mExpSchemas.get(i);
+            LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i);
+
+            // Use user defined schema to cast, this is the prevailing use case
+            if (mExpSchema==null) {
+                for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+                    if (hasOnlyNullOrByteArraySchema(fs)) {
+                        addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                    } else {
+                        addToExps(casterForEach, innerPlan, gen, exps, index, true, fs);
+                        needCast = true;
+                    }
+                    index++;
+                }
+                continue;
+            }
+
+            // No user defined schema, no need to cast
+            if (mUserDefinedSchema==null) {
+                for (int j=0;j<mExpSchema.size();j++) {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                    index++;
+                }
+                continue;
+            }
+
+            // Expression has schema, but user also define schema, need cast only
+            // when there is a mismatch
+            if (mExpSchema.size()!=mUserDefinedSchema.size()) {
+                throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() +
+                        " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation());
+            }
+
+            LogicalSchema replacedSchema = replaceNullByteArraySchema(mExpSchema,mUserDefinedSchema);
+            for (int j=0;j<mExpSchema.size();j++) {
+                LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j);
+                LogicalFieldSchema mUserDefinedFieldSchema = replacedSchema.getField(j);
+
+                if (hasOnlyNullOrByteArraySchema(mUserDefinedFieldSchema) ||
+                    LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                } else {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema);
+                    needCast = true;
+                }
+                index++;
+            }
+        }
+
+        gen.setFlattenFlags(new boolean[index]);
+        if (needCast) {
+            // Insert the casterForEach into the plan and patch up the plan.
+            List <Operator> successorOps = plan.getSuccessors(foreach);
+            if (successorOps != null && successorOps.size() > 0){
+                Operator next = plan.getSuccessors(foreach).get(0);
+                plan.insertBetween(foreach, casterForEach, next);
+            }else{
+                plan.add(casterForEach);
+                plan.connect(foreach,casterForEach);
+            }
+
+            // Since the explict cast is now inserted after the original foreach,
+            // throwing away the user defined "types" but keeping the user
+            // defined names from the original foreach.
+            // 'generate' (LOGenerate) still holds the reference to this
+            // mUserDefinedSchemas
+            for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) {
+                resetTypeToNull( mUserDefinedSchema );
+            }
+        }
+    }
+
+    private void resetTypeToNull (LogicalSchema s1) {
+        if( s1 != null ) {
+            for (LogicalFieldSchema fs : s1.getFields()) {
+                if( DataType.isSchemaType(fs.type) ) {
+                    resetTypeToNull(fs.schema);
+                } else {
+                    fs.type = DataType.NULL;
+                }
+            }
+        }
+    }
+
+    private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen,
+            List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) {
+
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index);
+        innerPlan.add(innerLoad);
+        innerPlan.connect(innerLoad, gen);
+
+        LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+        ProjectExpression prj = new ProjectExpression(exp, index, 0, gen);
+        exp.add(prj);
+
+        if (needCaster) {
+            CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
+            exp.add(cast);
+        }
+        exps.add(exp);
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Fri Feb 24 08:19:42 2017
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -729,6 +730,44 @@ public class LineageFindRelVisitor exten
             }
         }
 
+        @Override
+        public void visit(UserFuncExpression op) throws FrontendException {
+
+            if( op.getFieldSchema() == null ) {
+                return;
+            }
+
+            FuncSpec funcSpec = null;
+            Class loader = instantiateCaster(op.getFuncSpec());
+            List<LogicalExpression> arguments = op.getArguments();
+            if ( loader != null ) {
+                // if evalFunc.getLoadCaster() returns, simply use that.
+                funcSpec = op.getFuncSpec();
+            } else if (arguments.size() != 0 ) {
+                FuncSpec baseFuncSpec = null;
+                LogicalFieldSchema fs = arguments.get(0).getFieldSchema();
+                if ( fs != null ) {
+                    baseFuncSpec = uid2LoadFuncMap.get(fs.uid);
+                    if( baseFuncSpec != null ) {
+                        funcSpec = baseFuncSpec;
+                        for(int i = 1; i < arguments.size(); i++) {
+                            fs = arguments.get(i).getFieldSchema();
+                            if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) {
+                                funcSpec = null;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+
+            if( funcSpec != null ) {
+                addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec);
+                // in case schema is nested, set funcSpec for all
+                setLoadFuncForUids(op.getFieldSchema().schema, funcSpec);
+            }
+        }
+
         /**
          * if there is a null constant under casts, return it
          * @param rel
@@ -770,6 +809,8 @@ public class LineageFindRelVisitor exten
                 caster = ((LoadFunc)obj).getLoadCaster();
             } else if (obj instanceof StreamToPig) {
                 caster = ((StreamToPig)obj).getLoadCaster();
+            } else if (obj instanceof EvalFunc) {
+                caster = ((EvalFunc)obj).getLoadCaster();
             } else {
                 throw new VisitorException("Invalid class type " + funcSpec.getClassName(),
                                            2270, PigException.BUG );

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Fri Feb 24 08:19:42 2017
@@ -458,6 +458,7 @@ public class TypeCheckingExpVisitor exte
         collectCastWarning(node, arg.getType(), toFs.type, msgCollector);
 
         CastExpression cast = new CastExpression(plan, arg, toFs);
+        cast.setLocation(node.getLocation());
         try {
             // disconnect cast and arg because the connection is already
             // added by cast constructor and insertBetween call is going
@@ -490,7 +491,7 @@ public class TypeCheckingExpVisitor exte
         byte outType = cast.getType();
         if(outType == DataType.BYTEARRAY && inType != outType) {
             int errCode = 1051;
-            String msg = "Cannot cast to bytearray";
+            String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray";
             msgCollector.collect(msg, MessageType.Error) ;
             throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
         }
@@ -607,7 +608,7 @@ public class TypeCheckingExpVisitor exte
             // Matching schemas if we're working with tuples/bags
             if (DataType.isSchemaType(lhsType)) {
                 try {
-                    if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){
+                    if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){
                         int errCode = 1048;
                         String msg = "Two inputs of BinCond must have compatible schemas."
                             + " left hand side: " + binCond.getLhs().getFieldSchema()

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Fri Feb 24 08:19:42 2017
@@ -351,7 +351,8 @@ public class TypeCheckingRelVisitor exte
 
             if (outFieldSchema.type != fs.type) {
                 castNeededCounter++ ;
-                new CastExpression(genPlan, project, outFieldSchema);
+                CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema);
+                castexp.setLocation(toOp.getLocation());
             }
 
             generatePlans.add(genPlan) ;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Fri Feb 24 08:19:42 2017
@@ -21,6 +21,7 @@ package org.apache.pig.newplan.logical.v
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
@@ -110,9 +111,20 @@ public class UnionOnSchemaSetter extends
                 } else {
                     ProjectExpression projExpr = 
                         new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
-                    if( fs.type != DataType.BYTEARRAY
-                        && opSchema.getField( pos ).type != fs.type ) {
-                        new CastExpression( exprPlan, projExpr, fs );
+                    if( opSchema.getField( pos ).type != fs.type ) {
+                        if( fs.type != DataType.BYTEARRAY ) {
+                            CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs );
+                            castexpr.setLocation(union.getLocation());
+                        } else {
+                            int errCode = 1056;
+                            String msg = "Union of incompatible types not allowed. "
+                                         + "Cannot cast from "
+                                         + DataType.findTypeName(opSchema.getField( pos ).type)
+                                         + " to bytearray for '"
+                                         + opSchema.getField( pos ).alias
+                                         + "'. Please typecast to compatible types before union." ;
+                            throw new FrontendException(union, msg, errCode, PigException.INPUT) ;
+                        }
                     }
                     genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
                 }

Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Feb 24 08:19:42 2017
@@ -34,6 +34,7 @@ import org.antlr.runtime.RecognitionExce
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.NonFSLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -888,7 +889,7 @@ public class LogicalPlanBuilder {
             if (absolutePath == null) {
                 absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) );
 
-                if (absolutePath!=null) {
+                if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) {
                     QueryParserUtils.setHdfsServers( absolutePath, pigContext );
                 }
                 fileNameMap.put( fileNameKey, absolutePath );
@@ -1357,13 +1358,19 @@ public class LogicalPlanBuilder {
         return Long.parseLong( num );
     }
 
+    /**
+     * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object
+     */
     static BigInteger parseBigInteger(String s) {
-        String num = s.substring( 0, s.length() - 1 );
+        String num = s.substring( 0, s.length() - 2 );
         return new BigInteger( num );
     }
 
+    /**
+     * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object
+     */
     static BigDecimal parseBigDecimal(String s) {
-        String num = s.substring( 0, s.length() - 1 );
+        String num = s.substring( 0, s.length() - 2 );
         return new BigDecimal( num );
     }
 
@@ -1781,6 +1788,8 @@ public class LogicalPlanBuilder {
             return JOINTYPE.REPLICATED;
          } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
              return LOJoin.JOINTYPE.HASH;
+         } else if( modifier.equalsIgnoreCase( "bloom" ) ) {
+             return LOJoin.JOINTYPE.BLOOM;
          } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
              return JOINTYPE.SKEWED;
          } else if (modifier.equalsIgnoreCase("merge")) {
@@ -1789,7 +1798,7 @@ public class LogicalPlanBuilder {
              return JOINTYPE.MERGESPARSE;
          } else {
              throw new ParserValidationException( intStream, loc,
-                      "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
+                      "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
          }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/PigMacro.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/PigMacro.java Fri Feb 24 08:19:42 2017
@@ -168,14 +168,9 @@ class PigMacro {
 
             Map<String, String> paramVal = pc.getParamVal();
             for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) {
-                if (paramVal.containsKey(e.getKey())) {
-                    throw new ParserException(
-                        "Macro contains argument or return value " + e.getKey() + " which conflicts " +
-                        "with a Pig parameter of the same name."
-                    );
-                } else {
-                    paramVal.put(e.getKey(), e.getValue());
-                }
+                // overwrite=false since macro parameters should have precedence
+                // over commandline parameters (if keys overlap)
+                pc.processOrdLine(e.getKey(), e.getValue(), false);
             }
             
             ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc);
@@ -219,6 +214,7 @@ class PigMacro {
         try {
             result = parser.query();
         } catch (RecognitionException e) {
+            e.line += startLine -1;
             String msg = (fileName == null) ? parser.getErrorHeader(e)
                     : QueryParserUtils.generateErrorHeader(e, fileName);
             msg += " " + parser.getErrorMessage(e, parser.getTokenNames());
@@ -236,7 +232,7 @@ class PigMacro {
         if (!macroDefNodes.isEmpty()) {
             String fname = ((PigParserNode)ast).getFileName();
             String msg = getErrorMessage(fname, ast.getLine(),
-                    "Invalide macro definition", "macro '" + name
+                    "Invalid macro definition", "macro '" + name
                             + "' contains macro definition.\nmacro content: "
                             + body);
             throw new ParserException(msg);
@@ -273,6 +269,7 @@ class PigMacro {
         try {
             result2 = walker.query();
         } catch (RecognitionException e) {
+            e.line += startLine - 1;
             String msg = walker.getErrorHeader(e) + " "
                     + walker.getErrorMessage(e, walker.getTokenNames());
             String msg2 = getErrorMessage(file, line, "Failed to mask macro '"

Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Fri Feb 24 08:19:42 2017
@@ -889,6 +889,8 @@ scalar : INTEGER
        | LONGINTEGER
        | FLOATNUMBER
        | DOUBLENUMBER
+       | BIGINTEGERNUMBER
+       | BIGDECIMALNUMBER
        | QUOTEDSTRING
        | NULL
        | TRUE

Modified: pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java Fri Feb 24 08:19:42 2017
@@ -23,6 +23,10 @@ import java.net.URISyntaxException;
 
 import org.apache.pig.PigServer;
 import org.apache.pig.tools.DownloadResolver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.hadoop.fs.Path;
 
 public class RegisterResolver {
 
@@ -66,15 +70,24 @@ public class RegisterResolver {
         String scheme = uri.getScheme();
         if (scheme != null) {
             scheme = scheme.toLowerCase();
+            if (scheme.equals("ivy")) {
+                DownloadResolver downloadResolver = DownloadResolver.getInstance();
+                return downloadResolver.downloadArtifact(uri, pigServer);
+            }
+            if (!hasFileSystemImpl(uri)) {
+                throw new ParserException("Invalid Scheme: " + uri.getScheme());
+            }
         }
-        if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
-            return new URI[] { uri };
-        } else if (scheme.equals("ivy")) {
-            DownloadResolver downloadResolver = DownloadResolver.getInstance();
-            return downloadResolver.downloadArtifact(uri, pigServer);
-        } else {
-            throw new ParserException("Invalid Scheme: " + uri.getScheme());
-        }
+        return new URI[] { uri };
+    }
+   
+    /**
+     * @param uri
+     * @return True if the uri has valid file system implementation
+     */ 
+    private boolean hasFileSystemImpl(URI uri) {
+      Configuration conf = ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties(), true);
+      return HadoopShims.hasFileSystemImpl(new Path(uri), conf);
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java Fri Feb 24 08:19:42 2017
@@ -75,12 +75,11 @@ public class SourceLocation {
         if (node != null) {
             InvocationPoint pt = node.getNextInvocationPoint();
             while (pt != null) {
-                sb.append("\n");
                 sb.append("at expanding macro '" + pt.getMacro() + "' ("
                         + pt.getFile() + ":" + pt.getLine() + ")");
                 pt = node.getNextInvocationPoint();
+                sb.append("\n");
             }
-            sb.append("\n");
         }
         sb.append( "<" );
         if( file != null && !file.isEmpty() )

Modified: pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java Fri Feb 24 08:19:42 2017
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -35,6 +36,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
@@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr
  *
  */
 public class LocalMapReduceSimulator {
-    
+
     private MapReduceLauncher launcher = new MapReduceLauncher();
-    
+
     private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
 
     @SuppressWarnings("unchecked")
@@ -88,12 +90,12 @@ public class LocalMapReduceSimulator {
                               PigContext pc) throws PigException, IOException, InterruptedException {
         phyToMRMap.clear();
         MROperPlan mrp = launcher.compile(php, pc);
-                
+
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
+
         JobControl jc;
         int numMRJobsCompl = 0;
         DataBag input;
@@ -106,6 +108,8 @@ public class LocalMapReduceSimulator {
         boolean needFileInput;
         final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
         pc.getProperties().setProperty("pig.illustrating", "true");
+        String jtIdentifier = "" + System.currentTimeMillis();
+        int jobId = 0;
         while(mrp.size() != 0) {
             jc = jcc.compile(mrp, "Illustrator");
             if(jc == null) {
@@ -113,6 +117,7 @@ public class LocalMapReduceSimulator {
             }
             List<Job> jobs = jc.getWaitingJobs();
             for (Job job : jobs) {
+                jobId++;
                 jobConf = job.getJobConf();
                 FileLocalizer.setInitialized(false);
                 ArrayList<ArrayList<OperatorKey>> inpTargets =
@@ -123,14 +128,14 @@ public class LocalMapReduceSimulator {
                 PigSplit split = null;
                 List<POStore> stores = null;
                 PhysicalOperator pack = null;
-                // revisit as there are new physical operators from MR compilation 
+                // revisit as there are new physical operators from MR compilation
                 if (!mro.mapPlan.isEmpty())
                     attacher.revisit(mro.mapPlan);
                 if (!mro.reducePlan.isEmpty()) {
                     attacher.revisit(mro.reducePlan);
                     pack = mro.reducePlan.getRoots().get(0);
                 }
-                
+
                 List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                 if (!mro.mapPlan.isEmpty()) {
                     stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
@@ -145,10 +150,10 @@ public class LocalMapReduceSimulator {
                 for (POStore store : stores) {
                     output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
                 }
-               
+
                 OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
                 oa.visit();
-                
+
                 if (!mro.reducePlan.isEmpty()) {
                     oa = new OutputAttacher(mro.reducePlan, output);
                     oa.visit();
@@ -168,6 +173,7 @@ public class LocalMapReduceSimulator {
                     if (input != null)
                         mro.mapPlan.remove(ld);
                 }
+                int mapTaskId = 0;
                 for (POLoad ld : lds) {
                     // check newly generated data first
                     input = output.get(ld.getLFile().getFileName());
@@ -180,7 +186,7 @@ public class LocalMapReduceSimulator {
                                      break;
                                 }
                             }
-                        } 
+                        }
                     }
                     needFileInput = (input == null);
                     split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
@@ -199,6 +205,7 @@ public class LocalMapReduceSimulator {
                             context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
                         }
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     } else {
                         if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -210,10 +217,11 @@ public class LocalMapReduceSimulator {
                         Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
                           .getIllustratorContext(jobConf, input, intermediateData, split);
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     }
                 }
-                
+
                 if (!mro.reducePlan.isEmpty())
                 {
                     if (pack instanceof POPackage)
@@ -233,19 +241,20 @@ public class LocalMapReduceSimulator {
                     }
 
                     ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
+                    context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
                     reduce.run(context);
                 }
                 for (PhysicalOperator key : mro.phyToMRMap.keySet())
                     for (PhysicalOperator value : mro.phyToMRMap.get(key))
                         phyToMRMap.put(key, value);
             }
-            
-            
+
+
             int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
-            
+
             numMRJobsCompl += removedMROp;
         }
-                
+
         jcc.reset();
     }
 
@@ -256,7 +265,7 @@ public class LocalMapReduceSimulator {
                     plan));
             this.outputBuffer = output;
         }
-        
+
         @Override
         public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
             if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {

Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java Fri Feb 24 08:19:42 2017
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.util.Shell;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStats;
 
 /**
@@ -127,7 +128,9 @@ public abstract class ScriptEngine {
     //protected static InputStream getScriptAsStream(String scriptPath) {
         InputStream is = null;
         File file = new File(scriptPath);
-        if (file.exists()) {
+        // In the frontend give preference to the local file.
+        // In the backend, try the jar first
+        if (UDFContext.getUDFContext().isFrontend() && file.exists()) {
             try {
                 is = new FileInputStream(file);
             } catch (FileNotFoundException e) {
@@ -156,7 +159,14 @@ public abstract class ScriptEngine {
                 }
             }
         }
-        
+        if (is == null && file.exists()) {
+            try {
+                is = new FileInputStream(file);
+            } catch (FileNotFoundException e) {
+                throw new IllegalStateException("could not find existing file "+scriptPath, e);
+            }
+        }
+
         // TODO: discuss if we want to add logic here to load a script from HDFS
 
         if (is == null) {

Modified: pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java Fri Feb 24 08:19:42 2017
@@ -95,7 +95,7 @@ public class JsFunction extends EvalFunc
 
     private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + value + " using " + stringify(schema));
+            LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema));
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java Fri Feb 24 08:19:42 2017
@@ -54,7 +54,7 @@ public class JythonFunction extends Eval
         try {
             f = JythonScriptEngine.getFunction(filename, functionName);
             this.function = f;
-            num_parameters = ((PyBaseCode) f.func_code).co_argcount;
+            num_parameters = ((PyBaseCode) f.__code__).co_argcount;
             PyObject outputSchemaDef = f.__findattr__("outputSchema".intern());
             if (outputSchemaDef != null) {
                 this.schema = Utils.getSchemaFromString(outputSchemaDef.toString());
@@ -105,7 +105,7 @@ public class JythonFunction extends Eval
     @Override
     public Object exec(Tuple tuple) throws IOException {
         try {
-            if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.func_code).varargs)) {
+            if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) {
                 // ignore input tuple
                 PyObject out = function.__call__();
                 return JythonUtils.pythonToPig(out);

Modified: pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java Fri Feb 24 08:19:42 2017
@@ -44,8 +44,6 @@ public class DownloadResolver {
     private static DownloadResolver downloadResolver = new DownloadResolver();
 
     private DownloadResolver() {
-        System.setProperty("groovy.grape.report.downloads", "true");
-
         if (System.getProperty("grape.config") != null) {
             LOG.info("Using ivysettings file from " + System.getProperty("grape.config"));
         } else {

Added: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.grunt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+
+import jline.console.ConsoleReader;
+
+/** Borrowed from jline.console.internal.ConsoleReaderInputStream. However,
+ *  we cannot use ConsoleReaderInputStream directly since:
+ *  1. ConsoleReaderInputStream is not public
+ *  2. ConsoleReaderInputStream has a bug which does not deal with UTF-8 correctly
+ */
+public class ConsoleReaderInputStream extends SequenceInputStream {
+    private static InputStream systemIn = System.in;
+
+    public static void setIn() throws IOException {
+        setIn(new ConsoleReader());
+    }
+
+    public static void setIn(final ConsoleReader reader) {
+        System.setIn(new ConsoleReaderInputStream(reader));
+    }
+
+    /**
+     * Restore the original {@link System#in} input stream.
+     */
+    public static void restoreIn() {
+        System.setIn(systemIn);
+    }
+
+    public ConsoleReaderInputStream(final ConsoleReader reader) {
+        super(new ConsoleEnumeration(reader));
+    }
+
+    private static class ConsoleEnumeration implements Enumeration {
+        private final ConsoleReader reader;
+        private ConsoleLineInputStream next = null;
+        private ConsoleLineInputStream prev = null;
+
+        public ConsoleEnumeration(final ConsoleReader reader) {
+            this.reader = reader;
+        }
+
+        public Object nextElement() {
+            if (next != null) {
+                InputStream n = next;
+                prev = next;
+                next = null;
+
+                return n;
+            }
+
+            return new ConsoleLineInputStream(reader);
+        }
+
+        public boolean hasMoreElements() {
+            // the last line was null
+            if ((prev != null) && (prev.wasNull == true)) {
+                return false;
+            }
+
+            if (next == null) {
+                next = (ConsoleLineInputStream) nextElement();
+            }
+
+            return next != null;
+        }
+    }
+
+    private static class ConsoleLineInputStream extends InputStream {
+        private final ConsoleReader reader;
+        private byte[] buffer = null;
+        private int index = 0;
+        private boolean eol = false;
+        protected boolean wasNull = false;
+
+        public ConsoleLineInputStream(final ConsoleReader reader) {
+            this.reader = reader;
+        }
+
+        public int read() throws IOException {
+            if (eol) {
+                return -1;
+            }
+
+            if (buffer == null) {
+                buffer = reader.readLine().getBytes();
+            }
+
+            if (buffer == null) {
+                wasNull = true;
+                return -1;
+            }
+
+            if (index >= buffer.length) {
+                eol = true;
+                return '\n'; // lines are ended with a newline
+            }
+
+            return buffer[index++];
+        }
+    }
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java Fri Feb 24 08:19:42 2017
@@ -20,7 +20,7 @@ package org.apache.pig.tools.grunt;
 import java.io.BufferedReader;
 import java.util.ArrayList;
 
-import jline.ConsoleReader;
+import jline.console.ConsoleReader;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,8 +52,8 @@ public class Grunt
 
     public void setConsoleReader(ConsoleReader c)
     {
-        c.addCompletor(new PigCompletorAliases(pig));
-        c.addCompletor(new PigCompletor());
+        c.addCompleter(new PigCompletorAliases(pig));
+        c.addCompleter(new PigCompletor());
         parser.setConsoleReader(c);
     }
 

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Fri Feb 24 08:19:42 2017
@@ -26,7 +26,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.Reader;
 import java.io.StringReader;
@@ -42,8 +41,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import jline.ConsoleReader;
-import jline.ConsoleReaderInputStream;
+import jline.console.ConsoleReader;
 
 import org.apache.commons.io.output.NullOutputStream;
 import org.apache.commons.logging.Log;
@@ -264,7 +262,7 @@ public class GruntParser extends PigScri
     public void prompt()
     {
         if (mInteractive) {
-            mConsoleReader.setDefaultPrompt("grunt> ");
+            mConsoleReader.setPrompt("grunt> ");
         }
     }
 
@@ -516,8 +514,13 @@ public class GruntParser extends PigScri
         ConsoleReader reader;
         boolean interactive;
 
-        mPigServer.getPigContext().setParams(params);
-        mPigServer.getPigContext().setParamFiles(files);
+        PigContext pc = mPigServer.getPigContext();
+
+        if( !loadOnly ) {
+          pc.getPreprocessorContext().paramScopePush();
+        }
+        pc.setParams(params);
+        pc.setParamFiles(files);
 
         try {
             FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script);
@@ -528,7 +531,7 @@ public class GruntParser extends PigScri
                 cmds = cmds.replaceAll("\t","    ");
 
                 reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()),
-                                           new OutputStreamWriter(System.out));
+                                           System.out);
                 reader.setHistory(mConsoleReader.getHistory());
                 InputStream in = new ConsoleReaderInputStream(reader);
                 inputReader = new BufferedReader(new InputStreamReader(in));
@@ -560,6 +563,9 @@ public class GruntParser extends PigScri
         if (interactive) {
             System.out.println("");
         }
+        if( ! loadOnly ) {
+          pc.getPreprocessorContext().paramScopePop();
+        }
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java Fri Feb 24 08:19:42 2017
@@ -33,9 +33,9 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import jline.Completor;
+import jline.console.completer.Completer;
 
-public class PigCompletor implements Completor {
+public class PigCompletor implements Completer {
     private final Log log = LogFactory.getLog(getClass());
     Set<String> candidates;
     static final String AUTOCOMPLETE_FILENAME = "autocomplete";

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java Fri Feb 24 08:19:42 2017
@@ -26,12 +26,11 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.pig.PigServer;
 
-import jline.Completor;
+import jline.console.completer.Completer;
 
-public class PigCompletorAliases implements Completor {
+public class PigCompletorAliases implements Completer {
     private final Log log = LogFactory.getLog(getClass());
     Set<String> keywords;
     PigServer pig;

Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Fri Feb 24 08:19:42 2017
@@ -259,8 +259,11 @@ TOKEN :
     <PIGDEFAULT: "%default" > 
 }
 
+
 TOKEN : 
 {
+    <REGISTER: "register"> : IN_REGISTER
+    |
     <IDENTIFIER: (<SPECIALCHAR>)*<LETTER>(<DIGIT> | <LETTER> | <SPECIALCHAR>)*>
     |
     <LITERAL: ("\"" ((~["\""])*("\\\"")?)* "\"")|("'" ((~["'"])*("\\\'")?)* "'") >
@@ -276,7 +279,14 @@ TOKEN :
     <OTHER: (~["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"])+ >
     |
     <NOT_OTHER_CHAR: ["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"] >
- 
+}
+
+<IN_REGISTER> MORE : { " " |  "\t" | "\r" | "\n"}
+
+<IN_REGISTER> TOKEN: {
+  <PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+> {
+        matchedToken.image = image.toString();
+    }: DEFAULT
 }
 
 void Parse() throws IOException : {}
@@ -288,6 +298,7 @@ void input() throws IOException  :
 {
     String s;
     Token strTok = null;
+    Token strTok2 = null;
 }
 {
     strTok = <PIG>
@@ -308,6 +319,20 @@ void input() throws IOException  :
         { pc.validate(strTok.toString()); }
     )
     |
+    strTok = <REGISTER>
+    strTok2 = <PATH> {}
+    {
+        // Adding a special case for register since it handles "/*" globbing
+        // and this conflicts with general multi-line comment "/*   */".
+        // See the comment above on OTHERS on how tokenizer matches the longest
+        // match.  Here, string next to "register" is treated as PATH TOKEN
+        // and therefore not consider "/*" as part of the comment
+        // (and avoid the longest match problem).
+        out.append(strTok.image);
+        String sub_line = pc.substitute(strTok2.image);
+        out.append(sub_line);
+    }
+    |
     s = paramString(){}
     {
         //process an ordinary pig line - perform substitution

Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Fri Feb 24 08:19:42 2017
@@ -27,6 +27,8 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -40,20 +42,26 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.validator.BlackAndWhitelistFilter;
 import org.apache.pig.validator.PigCommandFilter;
-import org.python.google.common.base.Preconditions;
 
 public class PreprocessorContext {
 
-    private Map<String, String> param_val;
+    private int tableinitsize = 10;
+    private Deque<Map<String,String>> param_val_stack;
 
-    // used internally to detect when a param is set multiple times,
-    // but it set with the same value so it's ok not to log a warning
-    private Map<String, String> param_source;
-    
     private PigContext pigContext;
 
     public Map<String, String> getParamVal() {
-        return param_val;
+        Map <String, String> ret = new Hashtable <String, String>(tableinitsize);
+
+        //stack (deque) iterates LIFO
+        for (Map <String, String> map : param_val_stack ) {
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                if( ! ret.containsKey(entry.getKey()) ) {
+                    ret.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        return ret;
     }
 
     private final Log log = LogFactory.getLog(getClass());
@@ -63,24 +71,15 @@ public class PreprocessorContext {
      *                smaller number only impacts performance
      */
     public PreprocessorContext(int limit) {
-        param_val = new Hashtable<String, String> (limit);
-        param_source = new Hashtable<String, String> (limit);
-    }
-
-    public PreprocessorContext(Map<String, String> paramVal) {
-        param_val = paramVal;
-        param_source = new Hashtable<String, String>(paramVal);
+        tableinitsize = limit;
+        param_val_stack = new ArrayDeque<Map<String,String>> ();
+        param_val_stack.push(new Hashtable<String, String> (tableinitsize));
     }
 
     public void setPigContext(PigContext context) {
         this.pigContext = context;
     }
 
-    /*
-    public  void processLiteral(String key, String val) {
-        processLiteral(key, val, true);
-    } */
-
     /**
      * This method generates parameter value by running specified command
      *
@@ -102,20 +101,35 @@ public class PreprocessorContext {
         processOrdLine(key, val, true);
     }
 
-    /*
-    public  void processLiteral(String key, String val, Boolean overwrite) {
+    public void paramScopePush() {
+        param_val_stack.push( new Hashtable<String, String> (tableinitsize) );
+    }
 
-        if (param_val.containsKey(key)) {
-            if (overwrite) {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
-            } else {
-                return;
+    public void paramScopePop() {
+        param_val_stack.pop();
+    }
+
+    public boolean paramval_containsKey(String key) {
+        for (Map <String, String> map : param_val_stack ) {
+            if( map.containsKey(key) ) {
+                return true;
             }
         }
+        return false;
+    }
 
-        String sub_val = substitute(val);
-        param_val.put(key, sub_val);
-    } */
+    public String paramval_get(String key) {
+        for (Map <String, String> map : param_val_stack ) {
+            if( map.containsKey(key) ) {
+                return map.get(key);
+            }
+        }
+        return null;
+    }
+
+    public void paramval_put(String key, String value) {
+        param_val_stack.peek().put(key, value);
+    }
 
     /**
      * This method generates parameter value by running specified command
@@ -129,21 +143,21 @@ public class PreprocessorContext {
             filter.validate(PigCommandFilter.Command.SH);
         }
 
-        if (param_val.containsKey(key)) {
-            if (param_source.get(key).equals(val) || !overwrite) {
-                return;
-            } else {
-                log.warn("Warning : Multiple values found for " + key
-                        + ". Using value " + val);
-            }
+        if (paramval_containsKey(key) && !overwrite) {
+            return;
         }
 
-        param_source.put(key, val);
-
         val = val.substring(1, val.length()-1); //to remove the backticks
         String sub_val = substitute(val);
         sub_val = executeShellCommand(sub_val);
-        param_val.put(key, sub_val);
+
+        if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) {
+            //(boolean overwrite is always true here)
+            log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. "
+                     + "Previous value " + paramval_get(key) + ", now using value " + sub_val);
+        }
+
+        paramval_put(key, sub_val);
     }
 
     public void validate(String preprocessorCmd) throws FrontendException {
@@ -175,18 +189,18 @@ public class PreprocessorContext {
      */
     public  void processOrdLine(String key, String val, Boolean overwrite)  throws ParameterSubstitutionException {
 
-        if (param_val.containsKey(key)) {
-            if (param_source.get(key).equals(val) || !overwrite) {
+        String sub_val = substitute(val, key);
+        if (paramval_containsKey(key)) {
+            if (paramval_get(key).equals(sub_val) || !overwrite) {
                 return;
             } else {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
+                log.warn("Warning : Multiple values found for " + key
+                         + ". Previous value " + paramval_get(key)
+                         + ", now using value " + sub_val);
             }
         }
 
-        param_source.put(key, val);
-
-        String sub_val = substitute(val, key);
-        param_val.put(key, sub_val);
+        paramval_put(key, sub_val);
     }
 
 
@@ -318,7 +332,7 @@ public class PreprocessorContext {
         while (bracketKeyMatcher.find()) {
             if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) {
                 key = bracketKeyMatcher.group(1);
-                if (!(param_val.containsKey(key))) {
+                if (!(paramval_containsKey(key))) {
                     String message;
                     if (parentKey == null) {
                         message = "Undefined parameter : " + key;
@@ -327,7 +341,7 @@ public class PreprocessorContext {
                     }
                     throw new ParameterSubstitutionException(message);
                 }
-                val = param_val.get(key);
+                val = paramval_get(key);
                 if (val.contains("$")) {
                     val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
                 }
@@ -345,7 +359,7 @@ public class PreprocessorContext {
             // for escaped vars of the form \$<id>
             if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) {
                 key = keyMatcher.group(1);
-                if (!(param_val.containsKey(key))) {
+                if (!(paramval_containsKey(key))) {
                     String message;
                     if (parentKey == null) {
                         message = "Undefined parameter : " + key;
@@ -354,7 +368,7 @@ public class PreprocessorContext {
                     }
                     throw new ParameterSubstitutionException(message);
                 }
-                val = param_val.get(key);
+                val = paramval_get(key);
                 if (val.contains("$")) {
                     val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
                 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Feb 24 08:19:42 2017
@@ -23,6 +23,7 @@ options {
   STATIC = false;
   // Case is ignored in keywords
   IGNORE_CASE = true;
+  // DEBUG_PARSER = true;
   JAVA_UNICODE_ESCAPE = true;
 }
 
@@ -36,7 +37,7 @@ import java.util.List;
 import java.util.ArrayList;
 import org.apache.pig.impl.util.StringUtils;
 
-import jline.ConsoleReader;
+import jline.console.ConsoleReader;
 
 public abstract class PigScriptParser
 {
@@ -217,7 +218,7 @@ TOKEN_MGR_DECLS : {
 		{
 			/*System.err.print(">> ");
 			System.err.flush();*/
-		    consoleReader.setDefaultPrompt(">> ");
+		    consoleReader.setPrompt(">> ");
 		}
 	}
 
@@ -267,7 +268,7 @@ TOKEN_MGR_DECLS : {
 	<"'"> {prevState = PIG_START;} : IN_STRING
 |	<"`"> {prevState = PIG_START;} : IN_COMMAND
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+ > {prevState = PIG_START;} : GENERATE
 |       <"{"> {pigBlockLevel = 1;} : IN_BLOCK
 |       <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);}
 |       <";"> : PIG_END
@@ -292,7 +293,8 @@ TOKEN_MGR_DECLS : {
 
 <IN_STRING> MORE :
 {
-	<"\\'">
+	<"\\\\">
+|	<"\\'">
 |	<"'"> { SwitchTo(prevState);}
 |	<("\n" | "\r" | "\r\n")> {secondary_prompt();}
 |	<(~[])>
@@ -395,7 +397,7 @@ TOKEN_MGR_DECLS : {
 {
 	<"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE
 |	<"{"> {pigBlockLevel++;}
 |       <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);}
 |	<"'"> {prevState = IN_BLOCK;} : IN_STRING

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java Fri Feb 24 08:19:42 2017
@@ -147,6 +147,11 @@ final class EmbeddedPigStats extends Pig
     }
 
     @Override
+    public String getDisplayString() {
+        return null;
+    }
+
+    @Override
     public long getProactiveSpillCountObjects() {
         throw new UnsupportedOperationException();
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java Fri Feb 24 08:19:42 2017
@@ -87,6 +87,11 @@ public class EmptyPigStats extends PigSt
     }
 
     @Override
+    public String getDisplayString() {
+        return null;
+    }
+
+    @Override
     public JobGraph getJobGraph() {
        return emptyJobPlan;
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java Fri Feb 24 08:19:42 2017
@@ -134,6 +134,11 @@ public abstract class PigStats {
     }
 
     /**
+     * Returns the display message in pig grunt
+     */
+    public abstract String getDisplayString();
+
+    /**
      * Returns the DAG of jobs spawned by the script
      */
     public JobGraph getJobGraph() {
@@ -265,6 +270,13 @@ public abstract class PigStats {
         return ScriptState.get().getPigVersion();
     }
 
+    /**
+     *  Returns the contents of the script that was run.
+     */
+    public String getScript() {
+        return ScriptState.get().getScript();
+    }
+
     public String getScriptId() {
         return ScriptState.get().getId();
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Fri Feb 24 08:19:42 2017
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 /**
@@ -71,7 +71,7 @@ public class PigStatsUtil {
      */
     @Deprecated
     public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
+            = MRPigStatsUtil.FS_COUNTER_GROUP;
 
     /**
      * Returns an empty PigStats object Use of this method is not advised as it

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Feb 24 08:19:42 2017
@@ -133,6 +133,8 @@ public abstract class ScriptState {
         MERGE_SPARSE_JOIN,
         REPLICATED_JOIN,
         SKEWED_JOIN,
+        BUILD_BLOOM,
+        FILTER_BLOOM,
         HASH_JOIN,
         COLLECTED_GROUP,
         MERGE_COGROUP,
@@ -312,7 +314,7 @@ public abstract class ScriptState {
                 maxScriptSize = Integer.valueOf(prop);
             }
         }
-       
+
         this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
                                                    : script;
 
@@ -485,6 +487,10 @@ public abstract class ScriptState {
         public void visit(LOJoin op) {
             if (op.getJoinType() == JOINTYPE.HASH) {
                 feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+            } else if (op.getJoinType() == JOINTYPE.BLOOM) {
+                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+                feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGE) {
                 feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGESPARSE) {
@@ -506,6 +512,7 @@ public abstract class ScriptState {
             feature.set(PIG_FEATURE.RANK.ordinal());
         }
 
+        @Override
         public void visit(LOSort op) {
             feature.set(PIG_FEATURE.ORDER_BY.ordinal());
         }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Fri Feb 24 08:19:42 2017
@@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.io.FileSpec;
@@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
 
+import org.python.google.common.collect.Lists;
+
 
 /**
  * This class encapsulates the runtime statistics of a MapReduce job.
@@ -281,7 +284,7 @@ public final class MRJobStats extends Jo
 
     void addCounters(Job job) {
         try {
-            counters = HadoopShims.getCounters(job);
+            counters = getCounters(job);
         } catch (IOException e) {
             LOG.warn("Unable to get job counters", e);
         }
@@ -349,13 +352,13 @@ public final class MRJobStats extends Jo
     void addMapReduceStatistics(Job job) {
         Iterator<TaskReport> maps = null;
         try {
-            maps = HadoopShims.getTaskReports(job, TaskType.MAP);
+            maps = getTaskReports(job, TaskType.MAP);
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
         Iterator<TaskReport> reduces = null;
         try {
-            reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+            reduces = getTaskReports(job, TaskType.REDUCE);
         } catch (IOException e) {
             LOG.warn("Failed to get reduce task report", e);
         }
@@ -515,4 +518,35 @@ public final class MRJobStats extends Jo
         inputs.add(is);
     }
 
+    public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
+        if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
+            LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
+            return null;
+        }
+        Cluster cluster = new Cluster(job.getJobConf());
+        try {
+            org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+            if (mrJob == null) { // In local mode, mrJob will be null
+                mrJob = job.getJob();
+            }
+            org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+            return Lists.newArrayList(reports).iterator();
+        } catch (InterruptedException ir) {
+            throw new IOException(ir);
+        }
+    }
+
+    public static Counters getCounters(Job job) throws IOException {
+        try {
+            Cluster cluster = new Cluster(job.getJobConf());
+            org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+            if (mrJob == null) { // In local mode, mrJob will be null
+                mrJob = job.getJob();
+            }
+            return new Counters(mrJob.getCounters());
+        } catch (Exception ir) {
+            throw new IOException(ir);
+        }
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Fri Feb 24 08:19:42 2017
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS
     public static final String TASK_COUNTER_GROUP
             = "org.apache.hadoop.mapred.Task$Counter";
     public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
+            = "org.apache.hadoop.mapreduce.FileSystemCounter";
 
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
 

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Fri Feb 24 08:19:42 2017
@@ -207,13 +207,18 @@ public final class SimplePigStats extend
     }
 
     void display() {
+        LOG.info(getDisplayString());
+    }
+
+    @Override
+    public String getDisplayString() {
         if (returnCode == ReturnCode.UNKNOWN) {
             LOG.warn("unknown return code, can't display the results");
-            return;
+            return "";
         }
         if (pigContext == null) {
             LOG.warn("unknown exec type, don't display the results");
-            return;
+            return "";
         }
 
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
@@ -276,7 +281,7 @@ public final class SimplePigStats extend
 
         sb.append("\nJob DAG:\n").append(jobPlan.toString());
 
-        LOG.info("Script Statistics: \n" + sb.toString());
+        return "Script Statistics: \n" + sb.toString();
     }
 
     void mapMROperToJob(MapReduceOper mro, Job job) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Fri Feb 24 08:19:42 2017
@@ -115,6 +115,12 @@ public class SparkPigStats extends PigSt
     }
 
     private void display() {
+       LOG.info(getDisplayString());
+    }
+
+    @Override
+    public String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
         Iterator<JobStats> iter = jobPlan.iterator();
         while (iter.hasNext()) {
             SparkJobStats js = (SparkJobStats)iter.next();
@@ -122,22 +128,23 @@ public class SparkPigStats extends PigSt
                 SparkOperator sparkOperator = jobSparkOperatorMap.get(js);
                 js.setAlias(sparkOperator);
             }
-            LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
+            sb.append("Spark Job [" + js.getJobId() + "] Metrics");
             Map<String, Long> stats = js.getStats();
             if (stats == null) {
-                LOG.info("No statistics found for job " + js.getJobId());
-                return;
+                sb.append("No statistics found for job " + js.getJobId());
+                return sb.toString();
             }
 
             Iterator statIt = stats.entrySet().iterator();
             while (statIt.hasNext()) {
                 Map.Entry pairs = (Map.Entry)statIt.next();
-                LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
+                sb.append("\t" + pairs.getKey() + " : " + pairs.getValue());
             }
             for (InputStats inputStat : js.getInputs()){
-                LOG.info("\t"+inputStat.getDisplayString());
+                sb.append("\t"+inputStat.getDisplayString());
             }
         }
+        return sb.toString();
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Feb 24 08:19:42 2017
@@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat
                             OutputStats existingOut = outputsByLocation.get(output.getLocation());
                             // In case of multistore, bytesWritten is already calculated
                             // from size of all the files in the output directory.
-                            if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) {
+                            // So use that if there is a combination of multistore and single store
+                            if (output.getPOStore().isMultiStore()) {
+                                existingOut.setBytes(output.getBytes());
+                                existingOut.setPOStore(output.getPOStore());
+                            } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) {
                                 long bytes = existingOut.getBytes() > -1
                                         ? (existingOut.getBytes() + output.getBytes())
                                         : output.getBytes();

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Feb 24 08:19:42 2017
@@ -117,6 +117,11 @@ public class TezPigScriptStats extends P
     }
 
     private void display() {
+        LOG.info(getDisplayString());
+    }
+
+    @Override
+    public String getDisplayString() {
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
         StringBuilder sb = new StringBuilder();
         sb.append("\n");
@@ -170,7 +175,7 @@ public class TezPigScriptStats extends P
         for (OutputStats os : getOutputStats()) {
             sb.append(os.getDisplayString().trim()).append("\n");
         }
-        LOG.info("Script Statistics:\n" + sb.toString());
+        return "Script Statistics:\n" + sb.toString();
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Feb 24 08:19:42 2017
@@ -275,6 +275,12 @@ public class TezScriptState extends Scri
                 if (tezOp.isRegularJoin()) {
                     feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
                 }
+                if (tezOp.isBuildBloom()) {
+                    feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+                }
+                if (tezOp.isFilterBloom()) {
+                    feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
+                }
                 if (tezOp.isUnion()) {
                     feature.set(PIG_FEATURE.UNION.ordinal());
                 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Feb 24 08:19:42 2017
@@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst
 import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -289,13 +290,19 @@ public class TezVertexStats extends JobS
         }
 
         // Split followed by union will have multiple stores writing to same location
-        Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
+        Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>();
         for (POStore sto : stores) {
             POStoreTez store = (POStoreTez) sto;
-            uniqueOutputs.put(store.getOutputKey(), store);
+            List<POStore> stores = uniqueOutputs.get(store.getOutputKey());
+            if (stores == null) {
+                stores = new ArrayList<POStore>();
+            }
+            stores.add(store);
+            uniqueOutputs.put(store.getOutputKey(), stores);
         }
 
-        for (POStore sto : uniqueOutputs.values()) {
+        for (List<POStore> stores : uniqueOutputs.values()) {
+            POStore sto = stores.get(0);
             if (sto.isTmpStore()) {
                 continue;
             }
@@ -304,11 +311,16 @@ public class TezVertexStats extends JobS
             String filename = sto.getSFile().getFileName();
             if (counters != null) {
                 if (msGroup != null) {
-                    Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
-                    if (n != null) records = n;
-                }
-                if (records == -1) {
-                    records = outputRecords;
+                    long n = 0;
+                    Long val = null;
+                    for (POStore store : stores) {
+                        val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store));
+                        // Tez removes 0 value counters for efficiency.
+                        if (val != null) {
+                            n += val;
+                        };
+                    }
+                    records = n;
                 }
                 if (isSuccessful() && records == -1) {
                     // Tez removes 0 value counters for efficiency.
@@ -338,13 +350,13 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public int getNumberMaps() {
-        return this.isMapOpts ? numTasks : -1;
+        return this.isMapOpts ? numTasks : 0;
     }
 
     @Override
     @Deprecated
     public int getNumberReduces() {
-        return this.isMapOpts ? -1 : numTasks;
+        return this.isMapOpts ? 0 : numTasks;
     }
 
     @Override
@@ -386,25 +398,25 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public long getMapInputRecords() {
-        return this.isMapOpts ? numInputRecords : -1;
+        return this.isMapOpts ? numInputRecords : 0;
     }
 
     @Override
     @Deprecated
     public long getMapOutputRecords() {
-        return this.isMapOpts ? numOutputRecords : -1;
+        return this.isMapOpts ? numOutputRecords : 0;
     }
 
     @Override
     @Deprecated
     public long getReduceInputRecords() {
-        return this.isMapOpts ? -1 : numInputRecords;
+        return numReduceInputRecords;
     }
 
     @Override
     @Deprecated
     public long getReduceOutputRecords() {
-        return this.isMapOpts ? -1 : numOutputRecords;
+        return this.isMapOpts ? 0 : numOutputRecords;
     }
 
     @Override



Mime
View raw message