From commits-return-8305-archive-asf-public=cust-asf.ponee.io@pig.apache.org Thu Oct 4 00:39:14 2018
Return-Path:
X-Original-To: archive-asf-public@cust-asf.ponee.io
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by mx-eu-01.ponee.io (Postfix) with SMTP id C69C318065B
for ; Thu, 4 Oct 2018 00:39:12 +0200 (CEST)
Received: (qmail 15289 invoked by uid 500); 3 Oct 2018 22:39:11 -0000
Mailing-List: contact commits-help@pig.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@pig.apache.org
Delivered-To: mailing list commits@pig.apache.org
Received: (qmail 15280 invoked by uid 99); 3 Oct 2018 22:39:11 -0000
Received: from Unknown (HELO svn01-us-west.apache.org) (209.188.14.144)
by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Oct 2018 22:39:11 +0000
Received: from svn01-us-west.apache.org (localhost [127.0.0.1])
by svn01-us-west.apache.org (ASF Mail Server at svn01-us-west.apache.org) with ESMTP id BF9863A0047
for ; Wed, 3 Oct 2018 22:39:10 +0000 (UTC)
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: svn commit: r1842768 - in /pig/trunk: ./
src/docs/src/documentation/content/xdocs/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/
src/org/apache/pi...
Date: Wed, 03 Oct 2018 22:39:09 -0000
To: commits@pig.apache.org
From: rohini@apache.org
X-Mailer: svnmailer-1.0.9
Message-Id: <20181003223910.BF9863A0047@svn01-us-west.apache.org>
Author: rohini
Date: Wed Oct 3 22:39:09 2018
New Revision: 1842768
URL: http://svn.apache.org/viewvc?rev=1842768&view=rev
Log:
PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/test/e2e/pig/tests/join.conf
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct 3 22:39:09 2018
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
IMPROVEMENTS
+PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)
+
PIG-5349: Log stderr output when shell command fail (knoguchi)
PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Wed Oct 3 22:39:09 2018
@@ -1210,7 +1210,8 @@ gets 1 GB of memory. Please share your o
used to filter records of the other relations before doing a regular hash join.
The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
-compared to the total records allowing many to be filtered before the join.
+compared to the total records allowing many to be filtered before the join. Bloom join is also ideal in cases of right outer join
+with smaller dataset on the right which is not supported by replicated join.
Before bloom join was added as a type of join, same functionality was achieved by users by using
the builtin bloom udfs which is not as efficient and required more lines of code as well.
Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.
@@ -1287,9 +1288,10 @@ In this case size of keys sent to the re
pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.
pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).
-
pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.
-
pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+
pig.bloomjoin.hash.type - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.
+
pig.bloomjoin.hash.functions - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.
+
pig.bloomjoin.nocombiner - To turn off combiner when most of the keys are unique. Default is false.
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Wed Oct 3 22:39:09 2018
@@ -186,7 +186,7 @@ public class COUNT extends EvalFunc<L
public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));}
}
static public class Final extends EvalFunc<Long> {
- public Tuple exec(Tuple input) throws IOException {return sum(input);}
+ public Long exec(Tuple input) throws IOException {return sum(input);}
}
static protected Long count(Tuple input) throws ExecException {
Object values = input.get(0);
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Oct 3 22:39:09 2018
@@ -204,6 +204,10 @@ public class PigConfiguration {
public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
/**
+ * To turn off combiner when most of the keys are unique.
+ */
+ public static final String PIG_BLOOMJOIN_NOCOMBINER = "pig.bloomjoin.nocombiner";
+ /**
* This key used to control the maximum size loaded into
* the distributed cache when doing fragment-replicated join
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Wed Oct 3 22:39:09 2018
@@ -26,10 +26,12 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -132,6 +134,12 @@ public class EndOfAllInputSetter extends
super.visitLocalRearrange(lr);
}
+ @Override
+ public void visitPackage(POPackage pkg) throws VisitorException {
+ if (pkg.getPkgr() instanceof BloomPackager) {
+ endOfAllInputFlag = true;
+ }
+ }
/**
* @return if end of all input is present
*/
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Oct 3 22:39:09 2018
@@ -49,6 +49,7 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -109,6 +110,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.BloomFilterPartitioner;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.WeightedRangePartitionerTez;
import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -119,6 +121,7 @@ import org.apache.pig.impl.builtin.Parti
import org.apache.pig.impl.builtin.TezIndexableLoader;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -134,6 +137,7 @@ import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -1452,46 +1456,45 @@ public class TezCompiler extends PhyPlan
POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
pkg.setNumInps(1);
- BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);;
- pkgr.setKeyType(DataType.INTEGER);
+ BloomPackager pkgr = new BloomPackager(createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
pkg.setPkgr(pkgr);
POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
combineBloomOp.plan.addAsLeaf(pkg);
combineBloomOp.plan.addAsLeaf(combineBloomOutput);
- edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
- edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
-
- // Add combiner as well.
- POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
- BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);
- combinerPkgr.setCombiner(true);
- combinerPkgr.setKeyType(DataType.INTEGER);
- pkg_c.setPkgr(combinerPkgr);
- pkg_c.setNumInps(1);
- edge.combinePlan.addAsLeaf(pkg_c);
- POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
- prjKey.setResultType(DataType.INTEGER);
- List clrInps = new ArrayList();
- PhysicalPlan pp = new PhysicalPlan();
- pp.add(prjKey);
- clrInps.add(pp);
- POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
- clr.setOutputKey(combineBloomOpKey);
- edge.combinePlan.addAsLeaf(clr);
-
if (createBloomInMap) {
+ pkgr.setKeyType(DataType.INTEGER);
+ edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+ edge.setIntermediateOutputKeyComparatorClass(
+ PigWritableComparators.PigIntRawBytesComparator.class.getName());
+ // Add combiner as well. Each of the bloom filter is 1 MB by default. When there are
+ // 100s of mappers producing bloom filter, it is better to have combiner
+ // on the reduce side.
+ POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+ pkg_c.setPkgr(new BloomPackager(createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType));
+ pkg_c.getPkgr().setKeyType(DataType.INTEGER);
+ pkg_c.setNumInps(1);
+ edge.combinePlan.addAsLeaf(pkg_c);
+ POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+ prjKey.setResultType(DataType.INTEGER);
+ List clrInps = new ArrayList();
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(prjKey);
+ clrInps.add(pp);
+ POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+ clr.setOutputKey(combineBloomOpKey);
+ edge.combinePlan.addAsLeaf(clr);
// No combiner needed on map as there will be only one bloom filter per map for each partition
// In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager
edge.setCombinerInMap(false);
edge.setCombinerInReducer(true);
} else {
- pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+ pkgr.setKeyType(DataType.BYTEARRAY);
+ edge.setIntermediateOutputKeyClass(NullableBytesWritable.class.getName());
+ edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigBytesRawBytesComparator.class.getName());
+ edge.partitionerClass = BloomFilterPartitioner.class;
// Do distinct of the keys on the map side to reduce data sent to reducers.
- // In case of reduce, not adding a combiner and doing the distinct during reduce itself.
- // If needed one can be added later
- edge.setCombinerInMap(true);
- edge.setCombinerInReducer(false);
+ edge.setNeedsDistinctCombiner(!conf.getBoolean(PigConfiguration.PIG_BLOOMJOIN_NOCOMBINER, false));
}
// Broadcast the final bloom filter to other inputs
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Wed Oct 3 22:39:09 2018
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.util.bloom.BloomFilter;
@@ -38,32 +37,27 @@ import org.apache.pig.data.Tuple;
public class BloomPackager extends Packager {
private static final long serialVersionUID = 1L;
+ private static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null);
+ private static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null);
private boolean bloomCreatedInMap;
private int vectorSizeBytes;
+ private int numBloomFilters;
private int numHash;
private int hashType;
- private byte bloomKeyType;
- private boolean isCombiner;
private transient ByteArrayOutputStream baos;
- private transient Iterator