pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1783988 [10/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Date Wed, 22 Feb 2017 09:43:46 GMT
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Wed Feb 22 09:43:41 2017
@@ -22,6 +22,7 @@ import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -166,7 +167,7 @@ public class TezLauncher extends Launche
         tezStats = new TezPigScriptStats(pc);
         PigStats.start(tezStats);
 
-        conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
+        conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
         TezJobCompiler jc = new TezJobCompiler(pc, conf);
         TezPlanContainer tezPlanContainer = compile(php, pc);
 
@@ -174,6 +175,10 @@ public class TezLauncher extends Launche
         tezScriptState.emitInitialPlanNotification(tezPlanContainer);
         tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
 
+        boolean stop_on_failure =
+                Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
+        boolean stoppedOnFailure = false;
+
         TezPlanContainerNode tezPlanContainerNode;
         TezOperPlan tezPlan;
         int processedDAGs = 0;
@@ -252,7 +257,18 @@ public class TezLauncher extends Launche
                     ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
             }
             handleUnCaughtException(pc);
-            tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed());
+            boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
+            tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
+            // if stop_on_failure is enabled, we need to stop immediately when any job has failed
+            if (!tezDAGSucceeded) {
+                if (stop_on_failure) {
+                    stoppedOnFailure = true;
+                    break;
+                } else {
+                    log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
+                            + "want Pig to stop immediately on failure.");
+                }
+            }
         }
 
         tezStats.finish();
@@ -279,6 +295,11 @@ public class TezLauncher extends Launche
             }
         }
 
+        if (stoppedOnFailure) {
+            throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
+                    PigException.REMOTE_ENVIRONMENT);
+        }
+
         return tezStats;
     }
 
@@ -402,9 +423,11 @@ public class TezLauncher extends Launche
         TezCompiler comp = new TezCompiler(php, pc);
         comp.compile();
         TezPlanContainer planContainer = comp.getPlanContainer();
-        for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
-                .getKeys().entrySet()) {
-            TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+        // Doing a sort so that test plan printed remains same between jdk7 and jdk8
+        List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet());
+        Collections.sort(opKeys);
+        for (OperatorKey opKey : opKeys) {
+            TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan();
             optimize(tezPlan, pc);
         }
         return planContainer;
@@ -499,7 +522,7 @@ public class TezLauncher extends Launche
 
     @Override
     public void killJob(String jobID, Configuration conf) throws BackendException {
-        if (runningJob != null && runningJob.getApplicationId().toString() == jobID) {
+        if (runningJob != null && runningJob.getApplicationId().toString().equals(jobID)) {
             try {
                 runningJob.killJob();
             } catch (Exception e) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Wed Feb 22 09:43:41 2017
@@ -39,6 +39,8 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class TezResourceManager {
     private static TezResourceManager instance = null;
     private boolean inited = false;
@@ -59,6 +61,7 @@ public class TezResourceManager {
     /**
      * This method is only used by test code to reset state.
      */
+    @VisibleForTesting
     public static void dropInstance() {
         instance = null;
     }
@@ -66,7 +69,7 @@ public class TezResourceManager {
     public void init(PigContext pigContext, Configuration conf) throws IOException {
         if (!inited) {
             this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext);
-            this.remoteFs = FileSystem.get(conf);
+            this.remoteFs = resourcesDir.getFileSystem(conf);
             this.conf = conf;
             this.pigContext = pigContext;
             this.inited = true;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Feb 22 09:43:41 2017
@@ -18,7 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +31,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
@@ -46,13 +50,13 @@ public class TezSessionManager {
     private static final Log log = LogFactory.getLog(TezSessionManager.class);
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+        Utils.addShutdownHookWithPriority(new Runnable() {
 
             @Override
             public void run() {
                 TezSessionManager.shutdown();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
     }
 
     private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock();
@@ -61,11 +65,17 @@ public class TezSessionManager {
     private TezSessionManager() {
     }
 
-    public static class SessionInfo {
-        SessionInfo(TezClient session, Map<String, LocalResource> resources) {
+    private static class SessionInfo {
+
+        public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) {
             this.session = session;
+            this.config = config;
             this.resources = resources;
         }
+
+        public TezConfiguration getConfig() {
+            return config;
+        }
         public Map<String, LocalResource> getResources() {
             return resources;
         }
@@ -77,20 +87,23 @@ public class TezSessionManager {
         }
         private TezClient session;
         private Map<String, LocalResource> resources;
+        private TezConfiguration config;
         private boolean inUse = false;
     }
 
     private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
 
-    private static SessionInfo createSession(Configuration conf,
+    private static SessionInfo createSession(TezConfiguration amConf,
             Map<String, LocalResource> requestedAMResources, Credentials creds,
             TezJobConfig tezJobConf) throws TezException, IOException,
             InterruptedException {
-        TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        MRToTezHelper.translateMRSettingsForTezAM(amConf);
         TezScriptState ss = TezScriptState.get();
         ss.addDAGSettingsToConf(amConf);
-        adjustAMConfig(amConf, tezJobConf);
-        String jobName = conf.get(PigContext.JOB_NAME, "pig");
+        if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, true)) {
+            adjustAMConfig(amConf, tezJobConf);
+        }
+        String jobName = amConf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
         try {
             tezClient.start();
@@ -104,12 +117,10 @@ public class TezSessionManager {
             tezClient.stop();
             throw new RuntimeException(e);
         }
-        return new SessionInfo(tezClient, requestedAMResources);
+        return new SessionInfo(tezClient, amConf, requestedAMResources);
     }
 
     private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
-        int requiredAMMaxHeap = -1;
-        int requiredAMResourceMB = -1;
         String amLaunchOpts = amConf.get(
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
@@ -122,8 +133,10 @@ public class TezSessionManager {
 
             // Need more room for native memory/virtual address space
             // when close to 4G due to 32-bit jvm 4G limit
-            int minAMMaxHeap = 3200;
-            int minAMResourceMB = 4096;
+            int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200;
+            int maxAMResourceMB = 4096;
+            int requiredAMResourceMB = maxAMResourceMB;
+            int requiredAMMaxHeap = maxAMHeap;
 
             // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
             // Increment container size by 512 mb for every additional 5K tasks.
@@ -135,22 +148,38 @@ public class TezSessionManager {
             //     5000 and above  - 1024Xmx, 1536 (512 native memory)
             for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
                 if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
-                    requiredAMMaxHeap = minAMMaxHeap;
-                    requiredAMResourceMB = minAMResourceMB;
                     break;
                 }
-                minAMResourceMB = minAMResourceMB - 512;
-                minAMMaxHeap = minAMResourceMB - 512;
+                requiredAMResourceMB = requiredAMResourceMB - 512;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
+            }
+
+            if (tezJobConf.getTotalVertices() > 30) {
+                //Add 512 mb per 30 vertices
+                int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30);
+                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
+            }
+
+            if (tezJobConf.getMaxOutputsinSingleVertex() > 10) {
+                //Add 256 mb per 5 outputs if a vertex has more than 10 outputs
+                int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5);
+                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
             }
 
+            requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB);
+            requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap);
+
             if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
                 amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
                 log.info("Increasing "
                         + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
                         + configuredAMResourceMB + " to "
                         + requiredAMResourceMB
-                        + " as the number of total estimated tasks is "
-                        + tezJobConf.getEstimatedTotalParallelism());
+                        + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
+                        + ", total vertices = " + tezJobConf.getTotalVertices()
+                        + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
 
                 if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
                     amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
@@ -158,8 +187,9 @@ public class TezSessionManager {
                     log.info("Increasing Tez AM Heap Size from "
                             + configuredAMMaxHeap + "M to "
                             + requiredAMMaxHeap
-                            + "M as the number of total estimated tasks is "
-                            + tezJobConf.getEstimatedTotalParallelism());
+                            + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
+                            + ", total vertices = " + tezJobConf.getTotalVertices()
+                            + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
                     log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
                             + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
                 }
@@ -178,7 +208,22 @@ public class TezSessionManager {
         return true;
     }
 
-    static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
+    private static boolean validateSessionConfig(SessionInfo currentSession,
+            Configuration newSessionConfig)
+            throws TezException, IOException {
+        // If DAG recovery is disabled for one and enabled for another, do not reuse
+        if (currentSession.getConfig().getBoolean(
+                    TezConfiguration.DAG_RECOVERY_ENABLED,
+                    TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)
+                != newSessionConfig.getBoolean(
+                        TezConfiguration.DAG_RECOVERY_ENABLED,
+                        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+            return false;
+        }
+        return true;
+    }
+
+    static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources,
             Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
         List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
         SessionInfo newSession = null;
@@ -196,7 +241,8 @@ public class TezSessionManager {
                         sessionsToRemove.add(sessionInfo);
                     } else if (!sessionInfo.inUse
                             && appMasterStatus.equals(TezAppMasterStatus.READY)
-                            && validateSessionResources(sessionInfo,requestedAMResources)) {
+                            && validateSessionResources(sessionInfo,requestedAMResources)
+                            && validateSessionConfig(sessionInfo, conf)) {
                         sessionInfo.inUse = true;
                         return sessionInfo.session;
                     }
@@ -253,6 +299,11 @@ public class TezSessionManager {
                 synchronized (sessionInfo) {
                     if (sessionInfo.session == session) {
                         log.info("Stopping Tez session " + session);
+                        String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+                                    .format(Calendar.getInstance().getTime());
+                        System.err.println(timeStamp + " Shutting down Tez session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + session.getAppMasterApplicationId());
                         session.stop();
                         sessionToRemove = sessionInfo;
                         break;
@@ -279,19 +330,30 @@ public class TezSessionManager {
             shutdown = true;
             for (SessionInfo sessionInfo : sessionPool) {
                 synchronized (sessionInfo) {
+                    TezClient session = sessionInfo.session;
                     try {
-                        if (sessionInfo.session.getAppMasterStatus().equals(
+                        String timeStamp = new SimpleDateFormat(
+                                "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
+                        if (session.getAppMasterStatus().equals(
                                 TezAppMasterStatus.SHUTDOWN)) {
                             log.info("Tez session is already shutdown "
-                                    + sessionInfo.session);
+                                    + session);
+                            System.err.println(timeStamp
+                                    + " Tez session is already shutdown " + session
+                                    + ", sessionName=" + session.getClientName()
+                                    + ", applicationId=" + session.getAppMasterApplicationId());
                             continue;
                         }
-                        log.info("Shutting down Tez session "
-                                + sessionInfo.session);
-                        sessionInfo.session.stop();
+                        log.info("Shutting down Tez session " + session);
+                        // Since hadoop calls org.apache.log4j.LogManager.shutdown();
+                        // the log.info message is not displayed with shutdown hook in Oozie
+                        System.err.println(timeStamp + " Shutting down Tez session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + session.getAppMasterApplicationId());
+                        session.stop();
                     } catch (Exception e) {
                         log.error("Error shutting down Tez session "
-                                + sessionInfo.session, e);
+                                + session, e);
                     }
                 }
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Feb 22 09:43:41 2017
@@ -32,10 +32,12 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.hash.Hash;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -44,8 +46,10 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -82,7 +86,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -110,6 +117,7 @@ import org.apache.pig.impl.builtin.GetMe
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -167,6 +175,10 @@ public class TezCompiler extends PhyPlan
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
 
+    // Contains the inputs to operator like join, with the list maintaining the
+    // same order of join from left to right
+    private Map<TezOperator, List<TezOperator>> inputsMap;
+
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
@@ -175,6 +187,8 @@ public class TezCompiler extends PhyPlan
     private boolean optimisticFileConcatenation = false;
     private List<String> readOnceLoadFuncs = null;
 
+    private Configuration conf;
+
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
     public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -184,6 +198,7 @@ public class TezCompiler extends PhyPlan
         this.pigContext = pigContext;
 
         pigProperties = pigContext.getProperties();
+        conf = ConfigurationUtil.toConfiguration(pigProperties, false);
         splitsSeen = Maps.newHashMap();
         tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
@@ -197,6 +212,7 @@ public class TezCompiler extends PhyPlan
         scope = roots.get(0).getOperatorKey().getScope();
         localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
         phyToTezOpMap = Maps.newHashMap();
+        inputsMap = Maps.newHashMap();
 
         fileConcatenationThreshold = Integer.parseInt(pigProperties
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -655,15 +671,8 @@ public class TezCompiler extends PhyPlan
             blocking();
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
 
-            // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
-            // with a global variable and a specific DistinctCombiner class. This seems better.
-            PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
-            addDistinctPlan(combinePlan, 1);
-
-            POLocalRearrangeTez clr = localRearrangeFactory.create();
-            clr.setOutputKey(curTezOp.getOperatorKey().toString());
-            clr.setDistinct(true);
-            combinePlan.addAsLeaf(clr);
+            TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
+            edge.setNeedsDistinctCombiner(true);
 
             curTezOp.markDistinct();
             addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
@@ -856,6 +865,7 @@ public class TezCompiler extends PhyPlan
             } else {
                 curTezOp.plan.addAsLeaf(op);
             }
+            phyToTezOpMap.put(op, curTezOp);
 
         } catch (Exception e) {
             int errCode = 2034;
@@ -900,6 +910,7 @@ public class TezCompiler extends PhyPlan
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
         try {
             blocking();
+            inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs)));
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             if (op.isCross()) {
@@ -1088,7 +1099,7 @@ public class TezCompiler extends PhyPlan
         indexerTezOp.setDontEstimateParallelism(true);
 
         POStore st = TezCompilerUtil.getStore(scope, nig);
-        FileSpec strFile = getTempFileSpec();
+        FileSpec strFile = getTempFileSpec(pigContext);
         st.setSFile(strFile);
         indexAggrOper.plan.addAsLeaf(st);
         indexAggrOper.setClosed(true);
@@ -1255,7 +1266,7 @@ public class TezCompiler extends PhyPlan
                 rightTezOprAggr.setDontEstimateParallelism(true);
 
                 POStore st = TezCompilerUtil.getStore(scope, nig);
-                FileSpec strFile = getTempFileSpec();
+                FileSpec strFile = getTempFileSpec(pigContext);
                 st.setSFile(strFile);
                 rightTezOprAggr.plan.addAsLeaf(st);
                 rightTezOprAggr.setClosed(true);
@@ -1346,6 +1357,9 @@ public class TezCompiler extends PhyPlan
                 } else if (op.getNumInps() > 1) {
                     curTezOp.markCogroup();
                 }
+            } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) {
+                curTezOp.markRegularJoin();
+                addBloomToJoin(op, curTezOp);
             }
         } catch (Exception e) {
             int errCode = 2034;
@@ -1354,6 +1368,132 @@ public class TezCompiler extends PhyPlan
         }
     }
 
+    private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException {
+
+        List<TezOperator> inputs = inputsMap.get(curTezOp);
+        TezOperator buildBloomOp;
+        List<TezOperator> applyBloomOps = new ArrayList<>();
+
+        String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
+        boolean createBloomInMap = "map".equals(strategy);
+        if (!createBloomInMap && !strategy.equals("reduce")) {
+            throw new PlanException(new IllegalArgumentException(
+                    "Invalid value for "
+                            + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " -  "
+                            + strategy + ". Valid values are map and reduce"));
+        }
+        int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
+        int vectorSizeBytes =  conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
+        int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf);
+        int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
+
+        // We build bloom of the right most input and apply the bloom filter on the left inputs by default.
+        // But in case of left outer join we build bloom of the left input and use it on the right input
+        boolean[] inner = op.getPkgr().getInner();
+        boolean skipNullKeys = true;
+        if (inner[inner.length - 1]) {  // inner has from right to left while inputs has from left to right
+            buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input
+            for (int i = 0; i < (inner.length - 1); i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+            skipNullKeys = inner[0];
+        } else {
+            // Left outer join
+            skipNullKeys = false;
+            buildBloomOp = inputs.get(0); // Bloom filter is built from left most input
+            for (int i = 1; i < inner.length; i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+        }
+
+        // Add BuildBloom operator to the input
+        POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0);
+        POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
+        bbr.setSkipNullKeys(skipNullKeys);
+        buildBloomOp.plan.remove(lr);
+        buildBloomOp.plan.addAsLeaf(bbr);
+
+        // Add a new reduce vertex that will construct the final bloom filter
+        //    - by combining the bloom filters from the buildBloomOp input tasks in the map strategy
+        //    - or directly from the keys from the buildBloomOp input tasks in the reduce strategy
+        TezOperator combineBloomOp = getTezOp();
+        tezPlan.add(combineBloomOp);
+        combineBloomOp.markBuildBloom();
+        // Explicitly set the parallelism for the new vertex to number of bloom filters.
+        // Auto parallelism will bring it down based on the actual output size
+        combineBloomOp.setEstimatedParallelism(numBloomFilters);
+        // We don't want parallelism to be changed during the run by grace auto parallelism
+        // It will take the whole input size and estimate way higher
+        combineBloomOp.setDontEstimateParallelism(true);
+
+        String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
+        bbr.setBloomOutputKey(combineBloomOpKey);
+
+
+        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
+        pkg.setNumInps(1);
+        BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
+        pkgr.setKeyType(DataType.INTEGER);
+        pkg.setPkgr(pkgr);
+        POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
+        combineBloomOp.plan.addAsLeaf(pkg);
+        combineBloomOp.plan.addAsLeaf(combineBloomOutput);
+
+        edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+        edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+
+        // Add combiner as well.
+        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
+        combinerPkgr.setCombiner(true);
+        combinerPkgr.setKeyType(DataType.INTEGER);
+        pkg_c.setPkgr(combinerPkgr);
+        pkg_c.setNumInps(1);
+        edge.combinePlan.addAsLeaf(pkg_c);
+        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+        prjKey.setResultType(DataType.INTEGER);
+        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(prjKey);
+        clrInps.add(pp);
+        POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+        clr.setOutputKey(combineBloomOpKey);
+        edge.combinePlan.addAsLeaf(clr);
+
+        if (createBloomInMap) {
+            // No combiner needed on map as there will be only one bloom filter per map for each partition
+            // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
+            edge.setCombinerInMap(false);
+            edge.setCombinerInReducer(true);
+        } else {
+            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+            // Do distinct of the keys on the map side to reduce data sent to reducers.
+            // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
+            // If needed one can be added later
+            edge.setCombinerInMap(true);
+            edge.setCombinerInReducer(false);
+        }
+
+        // Broadcast the final bloom filter to other inputs
+        for (TezOperator applyBloomOp : applyBloomOps) {
+            applyBloomOp.markFilterBloom();
+            lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
+            POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters);
+            applyBloomOp.plan.remove(lr);
+            applyBloomOp.plan.addAsLeaf(bfr);
+            bfr.setInputKey(combineBloomOpKey);
+            edge = new TezEdgeDescriptor();
+            edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+            edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+            TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge);
+            combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
+        }
+
+    }
+
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
@@ -1513,7 +1653,7 @@ public class TezCompiler extends PhyPlan
 
             for (int i=0; i<transformPlans.size(); i++) {
                 eps1.add(transformPlans.get(i));
-                flat1.add(true);
+                flat1.add(i == transformPlans.size() - 1 ? true : false);
             }
 
             // This foreach will pick the sort key columns from the POPoissonSample output
@@ -1722,7 +1862,7 @@ public class TezCompiler extends PhyPlan
      * @return
      * @throws IOException
      */
-    private FileSpec getTempFileSpec() throws IOException {
+    public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException {
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Wed Feb 22 09:43:41 2017
@@ -31,8 +31,13 @@ import org.apache.tez.runtime.library.ou
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
 public class TezEdgeDescriptor implements Serializable {
-    // Combiner runs on both input and output of Tez edge.
-    transient public PhysicalPlan combinePlan;
+
+    public transient PhysicalPlan combinePlan;
+    private boolean needsDistinctCombiner;
+    // Combiner runs on both input and output of Tez edge by default
+    // It can be configured to run only in output(map) or input(reduce)
+    private Boolean combinerInMap;
+    private Boolean combinerInReducer;
 
     public String inputClassName;
     public String outputClassName;
@@ -65,6 +70,30 @@ public class TezEdgeDescriptor implement
         dataMovementType = DataMovementType.SCATTER_GATHER;
     }
 
+    public boolean needsDistinctCombiner() {
+        return needsDistinctCombiner;
+    }
+
+    public void setNeedsDistinctCombiner(boolean nic) {
+        needsDistinctCombiner = nic;
+    }
+
+    public Boolean getCombinerInMap() {
+        return combinerInMap;
+    }
+
+    public void setCombinerInMap(Boolean combinerInMap) {
+        this.combinerInMap = combinerInMap;
+    }
+
+    public Boolean getCombinerInReducer() {
+        return combinerInReducer;
+    }
+
+    public void setCombinerInReducer(Boolean combinerInReducer) {
+        this.combinerInReducer = combinerInReducer;
+    }
+
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Wed Feb 22 09:43:41 2017
@@ -25,8 +25,9 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -217,8 +218,12 @@ public class TezOperPlan extends Operato
             newPlan.add(node);
         }
 
-        Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
-        for (TezOperator from : mFromEdges.keySet()) {
+        // Using a LinkedHashSet and doing a sort so that
+        // test plan printed remains same between jdk7 and jdk8
+        Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>();
+        List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet());
+        Collections.sort(fromEdges);
+        for (TezOperator from : fromEdges) {
             List<TezOperator> tos = mFromEdges.get(from);
             for (TezOperator to : tos) {
                 if (list.contains(from) || list.contains(to)) {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed Feb 22 09:43:41 2017
@@ -181,7 +181,11 @@ public class TezOperator extends Operato
         // Indicate if this job is a native job
         NATIVE,
         // Indicate if this job does rank counter
-        RANK_COUNTER;
+        RANK_COUNTER,
+        // Indicate if this job constructs bloom filter
+        BUILDBLOOM,
+        // Indicate if this job applies bloom filter
+        FILTERBLOOM;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -235,6 +239,7 @@ public class TezOperator extends Operato
     }
 
     private LoaderInfo loaderInfo = new LoaderInfo();
+    private long totalInputFilesSize = -1;
 
     public TezOperator(OperatorKey k) {
         super(k);
@@ -452,6 +457,22 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
     }
 
+    public boolean isBuildBloom() {
+        return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public void markBuildBloom() {
+        feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public boolean isFilterBloom() {
+        return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
+    public void markFilterBloom() {
+        feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {
@@ -651,6 +672,14 @@ public class TezOperator extends Operato
         return loaderInfo;
     }
 
+    public long getTotalInputFilesSize() {
+        return totalInputFilesSize;
+    }
+
+    public void setTotalInputFilesSize(long totalInputFilesSize) {
+        this.totalInputFilesSize = totalInputFilesSize;
+    }
+
     public void setUseGraceParallelism(boolean useGraceParallelism) {
         this.useGraceParallelism = useGraceParallelism;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Wed Feb 22 09:43:41 2017
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -161,7 +162,7 @@ public class TezPOPackageAnnotator exten
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
-            if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
+            if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
                 return;
             }
             loRearrangeFound++;
@@ -180,7 +181,9 @@ public class TezPOPackageAnnotator exten
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
 
-            Integer index = Integer.valueOf(lrearrange.getIndex());
+            // For BloomPackager there is only one input, but the
+            // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero
+            Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex());
             if(keyInfo.get(index) != null) {
                 if (isPOSplit) {
                     // Case of POSplit having more than one input in case of self join or union
@@ -197,12 +200,20 @@ public class TezPOPackageAnnotator exten
 
             }
 
-            keyInfo.put(index,
-                    new Pair<Boolean, Map<Integer, Integer>>(
-                            lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-            pkg.getPkgr().setKeyInfo(keyInfo);
-            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            if (pkg.getPkgr() instanceof BloomPackager ) {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                Boolean.FALSE, new HashMap<Integer, Integer>()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+            } else {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+                pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+                pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            }
+
         }
 
         /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Wed Feb 22 09:43:41 2017
@@ -29,9 +29,14 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -160,100 +165,178 @@ public class TezPlanContainer extends Op
             return;
         }
 
-        TezOperator operToSegment = null;
-        List<TezOperator> succs = new ArrayList<TezOperator>();
+        List<TezOperator> opersToSegment = null;
         try {
             // Split top down from root to leaves
-            SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
+            // Get list of operators closer to the root that can be segmented together
+            FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan);
             finder.visit();
-            operToSegment = finder.getOperatorToSegment();
+            opersToSegment = finder.getOperatorsToSegment();
         } catch (VisitorException e) {
             throw new PlanException(e);
         }
+        if (!opersToSegment.isEmpty()) {
+            Set<TezOperator> commonSplitterPredecessors = new HashSet<>();
+            for (TezOperator operToSegment : opersToSegment) {
+                for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) {
+                    commonSplitterPredecessors
+                            .addAll(getCommonSplitterPredecessors(tezOperPlan,
+                                    operToSegment, succ));
+                }
+            }
 
-        if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
-            succs.addAll(tezOperPlan.getSuccessors(operToSegment));
-            for (TezOperator succ : succs) {
-                tezOperPlan.disconnect(operToSegment, succ);
-            }
-            for (TezOperator succ : succs) {
-                try {
-                    if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
-                        // Has already been moved to a new plan by previous successor
-                        // as part of dependency. It could have been further split.
-                        // So walk the full plan to find the new plan and connect
-                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
-                        finder.visit();
-                        connect(planNode, finder.getPlanContainerNode());
-                        continue;
+            if (commonSplitterPredecessors.isEmpty()) {
+                List<TezOperator> allSuccs = new ArrayList<TezOperator>();
+                // Disconnect all the successors and move them to a new plan
+                for (TezOperator operToSegment : opersToSegment) {
+                    List<TezOperator> succs = new ArrayList<TezOperator>();
+                    succs.addAll(tezOperPlan.getSuccessors(operToSegment));
+                    allSuccs.addAll(succs);
+                    for (TezOperator succ : succs) {
+                        tezOperPlan.disconnect(operToSegment, succ);
                     }
-                    TezOperPlan newOperPlan = new TezOperPlan();
+                }
+                TezOperPlan newOperPlan = new TezOperPlan();
+                for (TezOperator succ : allSuccs) {
                     tezOperPlan.moveTree(succ, newOperPlan);
-                    TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
-                            generateNodeOperatorKey(), newOperPlan);
-                    add(newPlanNode);
-                    connect(planNode, newPlanNode);
-                    split(newPlanNode);
-                    if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
-                        // On further split, the successor moved to a new plan container.
-                        // Connect to that
-                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
-                        finder.visit();
-                        disconnect(planNode, newPlanNode);
-                        connect(planNode, finder.getPlanContainerNode());
+                }
+                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+                        generateNodeOperatorKey(), newOperPlan);
+                add(newPlanNode);
+                connect(planNode, newPlanNode);
+                split(newPlanNode);
+            } else {
+                // If there is a common splitter predecessor between operToSegment and the successor,
+                // we have to separate out that split to be able to segment.
+                // So we store the output of split to a temp store and then change the
+                // splittees to load from it.
+                String scope = opersToSegment.get(0).getOperatorKey().getScope();
+                for (TezOperator splitter : commonSplitterPredecessors) {
+                    try {
+                        List<TezOperator> succs = new ArrayList<TezOperator>();
+                        succs.addAll(tezOperPlan.getSuccessors(splitter));
+                        FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext);
+                        POStore tmpStore = getTmpStore(scope, fileSpec);
+                        // Replace POValueOutputTez with POStore
+                        splitter.plan.remove(splitter.plan.getLeaves().get(0));
+                        splitter.plan.addAsLeaf(tmpStore);
+                        splitter.segmentBelow = true;
+                        splitter.setSplitter(false);
+                        for (TezOperator succ : succs) {
+                            // Replace POValueInputTez with POLoad
+                            POLoad tmpLoad = getTmpLoad(scope, fileSpec);
+                            succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad);
+                        }
+                    } catch (Exception e) {
+                        throw new PlanException(e);
                     }
-                } catch (VisitorException e) {
-                    throw new PlanException(e);
                 }
             }
             split(planNode);
         }
     }
 
-    private static class SegmentOperatorFinder extends TezOpPlanVisitor {
+    private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor {
 
-        private TezOperator operToSegment;
+        private List<TezOperator> opersToSegment = new ArrayList<>();
 
-        public SegmentOperatorFinder(TezOperPlan plan) {
+        public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) {
             super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         }
 
-        public TezOperator getOperatorToSegment() {
-            return operToSegment;
+        public List<TezOperator> getOperatorsToSegment() {
+            return opersToSegment;
         }
 
         @Override
-        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
-            if (tezOperator.needSegmentBelow() && operToSegment == null) {
-                operToSegment = tezOperator;
+        public void visitTezOp(TezOperator tezOp) throws VisitorException {
+            if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) {
+                if (opersToSegment.isEmpty()) {
+                    opersToSegment.add(tezOp);
+                } else {
+                    // If the operator does not have dependency on previous
+                    // operators chosen for segmenting then add it to the
+                    // operators to be segmented together
+                    if (!hasPredecessor(tezOp, opersToSegment)) {
+                        opersToSegment.add(tezOp);
+                    }
+                }
             }
         }
 
-    }
-
-    private static class TezOperatorFinder extends TezPlanContainerVisitor {
+        /**
+         * Check if the tezOp has one of the opsToCheck as a predecessor.
+         * It can be a immediate predecessor or multiple levels up.
+         */
+        private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) {
+            List<TezOperator> predecessors = getPlan().getPredecessors(tezOp);
+            if (predecessors != null) {
+                for (TezOperator pred : predecessors) {
+                    if (opersToSegment.contains(pred)) {
+                        return true;
+                    } else {
+                        if (hasPredecessor(pred, opsToCheck)) {
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
 
-        private TezPlanContainerNode planContainerNode;
-        private TezOperator operatorToFind;
+    }
 
-        public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
-            super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
-            this.operatorToFind = operatorToFind;
+    private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) {
+        Set<TezOperator> splitters1 = new HashSet<>();
+        Set<TezOperator> splitters2 = new HashSet<>();
+        Set<TezOperator> processedPredecessors = new HashSet<>();
+        // Find predecessors which are splitters
+        fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1);
+        if (!splitters1.isEmpty()) {
+            // For the successor, traverse rest of the plan below it and
+            // search the predecessors of its successors to find any predecessor that might be a splitter.
+            Set<TezOperator> allSuccs = new HashSet<>();
+            getAllSuccessors(plan, successor, allSuccs);
+            processedPredecessors.clear();
+            processedPredecessors.add(successor);
+            for (TezOperator succ : allSuccs) {
+                fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2);
+            }
+            // Find the common ones
+            splitters1.retainAll(splitters2);
         }
+        return splitters1;
+    }
 
-        public TezPlanContainerNode getPlanContainerNode() {
-            return planContainerNode;
+    private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp,
+            Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) {
+        List<TezOperator> predecessors = plan.getPredecessors(tezOp);
+        if (predecessors != null) {
+            for (TezOperator pred : predecessors) {
+                // Skip processing already processed predecessor to avoid loops
+                if (processedPredecessors.contains(pred)) {
+                    continue;
+                }
+                if (pred.isSplitter()) {
+                    splitters.add(pred);
+                } else if (!pred.needSegmentBelow()) {
+                    processedPredecessors.add(pred);
+                    fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters);
+                }
+            }
         }
+    }
 
-        @Override
-        public void visitTezPlanContainerNode(
-                TezPlanContainerNode tezPlanContainerNode)
-                throws VisitorException {
-            if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
-                planContainerNode = tezPlanContainerNode;
+    private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) {
+        List<TezOperator> successors = plan.getSuccessors(tezOp);
+        if (successors != null) {
+            for (TezOperator succ : successors) {
+                if (!allSuccs.contains(succ)) {
+                    allSuccs.add(succ);
+                    getAllSuccessors(plan, succ, allSuccs);
+                }
             }
         }
-
     }
 
     private synchronized OperatorKey generateNodeOperatorKey() {
@@ -267,6 +350,21 @@ public class TezPlanContainer extends Op
         scopeId = 0;
     }
 
+    private POLoad getTmpLoad(String scope, FileSpec fileSpec){
+        POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        ld.setPc(pigContext);
+        ld.setIsTmpLoad(true);
+        ld.setLFile(fileSpec);
+        return ld;
+    }
+
+    private POStore getTmpStore(String scope, FileSpec fileSpec){
+        POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+        st.setIsTmpStore(true);
+        st.setSFile(fileSpec);
+        return new POStoreTez(st);
+    }
+
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Wed Feb 22 09:43:41 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
@@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla
                     printer.setVerbose(isVerbose);
                     printer.visit();
                     mStream.println();
+                } else if (edgeDesc.needsDistinctCombiner()) {
+                    mStream.println("# Combine plan on edge <" + inEdge + ">");
+                    mStream.println(DistinctCombiner.Combine.class.getName());
                 }
             }
         }

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class BloomPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean bloomCreatedInMap;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+    private byte bloomKeyType;
+    private boolean isCombiner;
+
+    private transient ByteArrayOutputStream baos;
+    private transient Iterator<Object> distinctKeyIter;
+
+    public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super();
+        this.bloomCreatedInMap = bloomCreatedInMap;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public void setBloomKeyType(byte keyType) {
+        bloomKeyType = keyType;
+    }
+
+    public void setCombiner(boolean isCombiner) {
+        this.isCombiner = isCombiner;
+    }
+
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
+    public Result getNext() throws ExecException {
+        try {
+            if (bloomCreatedInMap) {
+                if (bags == null) {
+                    return new Result(POStatus.STATUS_EOP, null);
+                }
+                // Same function for combiner and reducer
+                return combineBloomFilters();
+            } else {
+                if (isCombiner) {
+                    return getDistinctBloomKeys();
+                } else {
+                    if (bags == null) {
+                        return new Result(POStatus.STATUS_EOP, null);
+                    }
+                    return createBloomFilter();
+                }
+            }
+        } catch (IOException e) {
+            throw new ExecException("Error while constructing final bloom filter", e);
+        }
+    }
+
+    private Result combineBloomFilters() throws IOException {
+        // We get a bag of bloom filters. combine them into one
+        Iterator<Tuple> iter = bags[0].iterator();
+        Tuple tup = iter.next();
+        DataByteArray bloomBytes = (DataByteArray) tup.get(0);
+        BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+        while (iter.hasNext()) {
+            tup = iter.next();
+            bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length);
+    }
+
+    private Result createBloomFilter() throws IOException {
+        // We get a bag of keys. Create a bloom filter from them
+        // First do distinct of the keys. Not using DistinctBag as memory should not be a problem.
+        HashSet<Object> bloomKeys = new HashSet<>();
+        Iterator<Tuple> iter = bags[0].iterator();
+        while (iter.hasNext()) {
+            bloomKeys.add(iter.next().get(0));
+        }
+
+        Object partition = key;
+        detachInput(); // Free up the key and bags reference
+
+        BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+        for (Object bloomKey: bloomKeys) {
+            Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
+            bloomFilter.add(k);
+        }
+        bloomKeys = null;
+        return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64);
+
+    }
+
+    private Result getSerializedBloomFilter(Object partition,
+            BloomFilter bloomFilter, int serializedSize) throws ExecException,
+            IOException {
+        if (baos == null) {
+            baos = new ByteArrayOutputStream(serializedSize);
+        }
+        baos.reset();
+        DataOutputStream dos = new DataOutputStream(baos);
+        bloomFilter.write(dos);
+        dos.flush();
+
+        Tuple res = mTupleFactory.newTuple(2);
+        res.set(0, partition);
+        res.set(1, new DataByteArray(baos.toByteArray()));
+
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+
+    private Result getDistinctBloomKeys() throws ExecException {
+        if (distinctKeyIter == null) {
+            HashSet<Object> bloomKeys = new HashSet<>();
+            Iterator<Tuple> iter = bags[0].iterator();
+            while (iter.hasNext()) {
+                bloomKeys.add(iter.next().get(0));
+            }
+            distinctKeyIter = bloomKeys.iterator();
+        }
+        while (distinctKeyIter.hasNext()) {
+            Tuple res = mTupleFactory.newTuple(2);
+            res.set(0, key);
+            res.set(1, distinctKeyIter.next());
+
+            Result r = new Result();
+            r.result = res;
+            r.returnStatus = POStatus.STATUS_OK;
+            return r;
+        }
+        distinctKeyIter = null;
+        return new Result(POStatus.STATUS_EOP, null);
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.builtin.BuildBloomBase;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
+    private String inputKey;
+    private transient KeyValueReader reader;
+    private transient String cacheKey;
+    private int numBloomFilters;
+    private transient BloomFilter[] bloomFilters;
+
+    public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
+        super(lr);
+        this.numBloomFilters = numBloomFilters;
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "bloom-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            bloomFilters = (BloomFilter[]) cacheValue;
+            return;
+        }
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+            while (reader.next()) {
+                if (bloomFilters == null) {
+                    bloomFilters = new BloomFilter[numBloomFilters];
+                }
+                Tuple val = (Tuple) reader.getCurrentValue();
+                int index = (int) val.get(0);
+                bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+            }
+            ObjectCache.getInstance().cache(cacheKey, bloomFilters);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        // If there is no bloom filter, then it means right input was empty
+        // Skip processing
+        if (bloomFilters == null) {
+            return RESULT_EOP;
+        }
+
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        // Skip the record if key is not in the bloom filter
+                        if (!isKeyInBloomFilter(result.get(1))) {
+                            continue;
+                        }
+                        PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private boolean isKeyInBloomFilter(Object key) throws ExecException {
+        if (key == null) {
+            // Null values are dropped in a inner join and in the case of outer join,
+            // POBloomFilterRearrangeTez is only in the plan on the non outer relation.
+            // So just skip them
+            return false;
+        }
+        if (bloomFilters.length == 1) {
+            // Skip computing hashcode
+            Key k = new Key(DataType.toBytes(key, keyType));
+            return bloomFilters[0].membershipTest(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter != null) {
+                Key k = new Key(DataType.toBytes(key, keyType));
+                return filter.membershipTest(k);
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBloomFilterRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BloomFilter Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+    }
+
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.bloom.BloomFilter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableIntWritable;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+/**
+ * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
+ * In addition, it also writes out the bloom filter constructed from the join keys
+ * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
+ *
+ * Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
+ * It also allows us to have lower false positives with smaller vector sizes.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
+
+    public static final String DEFAULT_BLOOM_STRATEGY = "map";
+    public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
+    public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
+    public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
+    public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
+
+    private String bloomOutputKey;
+    private boolean skipNullKeys = false;
+    private boolean createBloomInMap;
+    private int numBloomFilters;
+    private int vectorSizeBytes;
+    private int numHash;
+    private int hashType;
+
+    private transient BloomFilter[] bloomFilters;
+    private transient KeyValueWriter bloomWriter;
+    private transient PigNullableWritable nullKey;
+    private transient Tuple bloomValue;
+    private transient NullableTuple bloomNullableTuple;
+
+    public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
+            boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
+            int numHash, int hashType) {
+        super(lr);
+        this.createBloomInMap = createBloomInMap;
+        this.numBloomFilters = numBloomFilters;
+        this.vectorSizeBytes = vectorSizeBytes;
+        this.numHash = numHash;
+        this.hashType = hashType;
+    }
+
+    public static int getNumBloomFilters(Configuration conf) {
+        if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
+        } else {
+            return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
+        }
+    }
+
+    public void setSkipNullKeys(boolean skipNullKeys) {
+        this.skipNullKeys = skipNullKeys;
+    }
+
+    public void setBloomOutputKey(String bloomOutputKey) {
+        this.bloomOutputKey = bloomOutputKey;
+    }
+
+    @Override
+    public boolean containsOutputKey(String key) {
+        if(super.containsOutputKey(key)) {
+            return true;
+        }
+        return bloomOutputKey.equals(key);
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey, bloomOutputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        } else if (oldOutputKey.equals(bloomOutputKey)) {
+            bloomOutputKey = newOutputKey;
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        super.attachOutputs(outputs, conf);
+        LogicalOutput output = outputs.get(bloomOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
+        }
+        try {
+            bloomWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+        bloomFilters = new BloomFilter[numBloomFilters];
+        bloomValue = mTupleFactory.newTuple(1);
+        bloomNullableTuple = new NullableTuple(bloomValue);
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+
+        PigNullableWritable key;
+        while (true) {
+            res = super.getRearrangedTuple();
+            try {
+                switch (res.returnStatus) {
+                case POStatus.STATUS_OK:
+                    if (illustrator == null) {
+                        Tuple result = (Tuple) res.result;
+                        Byte index = (Byte) result.get(0);
+
+                        Object keyObj = result.get(1);
+                        if (keyObj != null) {
+                            key = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            // null keys cannot be part of bloom filter
+                            // Since they are also dropped during join we can skip them
+                            if (createBloomInMap) {
+                                addKeyToBloomFilter(keyObj);
+                            } else {
+                                writeJoinKeyForBloom(keyObj);
+                            }
+                        } else if (skipNullKeys) {
+                            // Inner join. So don't bother writing null key
+                            continue;
+                        } else {
+                            if (nullKey == null) {
+                                nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
+                            }
+                            key = nullKey;
+                        }
+
+                        NullableTuple val = new NullableTuple((Tuple)result.get(2));
+                        key.setIndex(index);
+                        val.setIndex(index);
+                        writer.write(key, val);
+                    } else {
+                        illustratorMarkup(res.result, res.result, 0);
+                    }
+                    continue;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    if (this.parentPlan.endOfAllInput && createBloomInMap) {
+                        // In case of Split will get EOP after every record.
+                        // So check for endOfAllInput
+                        writeBloomFilters();
+                    }
+                case POStatus.STATUS_ERR:
+                default:
+                    return res;
+                }
+            } catch (IOException ioe) {
+                int errCode = 2135;
+                String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
+                throw new ExecException(msg, errCode, ioe);
+            }
+        }
+    }
+
+    private void addKeyToBloomFilter(Object key) throws ExecException {
+        Key k = new Key(DataType.toBytes(key, keyType));
+        if (bloomFilters.length == 1) {
+            if (bloomFilters[0] == null) {
+                bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+            }
+            bloomFilters[0].add(k);
+        } else {
+            int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+            BloomFilter filter = bloomFilters[partition];
+            if (filter == null) {
+                filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+                bloomFilters[partition] = filter;
+            }
+            filter.add(k);
+        }
+    }
+
+    private void writeJoinKeyForBloom(Object key) throws IOException {
+        int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
+        bloomValue.set(0, key);
+        bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
+    }
+
+    private void writeBloomFilters() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
+        for (int i = 0; i < bloomFilters.length; i++) {
+            if (bloomFilters[i] != null) {
+                DataOutputStream dos = new DataOutputStream(baos);
+                bloomFilters[i].write(dos);
+                dos.flush();
+                bloomValue.set(0, new DataByteArray(baos.toByteArray()));
+                bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
+                baos.reset();
+            }
+        }
+    }
+
+    @Override
+    public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
+        return (POBuildBloomRearrangeTez) super.clone();
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "BuildBloom Rearrange" + "["
+                + DataType.findTypeName(resultType) + "]" + "{"
+                + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+                + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
+    }
+
+}



Mime
View raw message